Skip to content

Fix InferenceModel deletion logic #393

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions cmd/epp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ func run() error {
return err
}

ctx := ctrl.SetupSignalHandler()

// Setup runner.
datastore := datastore.NewDatastore()
provider := backend.NewProvider(&vllm.PodMetricsClientImpl{}, datastore)
Expand All @@ -165,7 +167,7 @@ func run() error {
CertPath: *certPath,
Provider: provider,
}
if err := serverRunner.SetupWithManager(mgr); err != nil {
if err := serverRunner.SetupWithManager(ctx, mgr); err != nil {
setupLog.Error(err, "Failed to setup ext-proc controllers")
return err
}
Expand All @@ -188,7 +190,7 @@ func run() error {

// Start the manager. This blocks until a signal is received.
setupLog.Info("Controller manager starting")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
if err := mgr.Start(ctx); err != nil {
setupLog.Error(err, "Error starting controller manager")
return err
}
Expand Down
50 changes: 21 additions & 29 deletions pkg/epp/backend/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package backend
import (
"context"
"errors"
"sync"
"testing"
"time"

Expand All @@ -37,6 +36,9 @@ var (
Name: "pod1",
},
},
}
pod1WithMetrics = &datastore.PodMetrics{
Pod: pod1.Pod,
Metrics: datastore.Metrics{
WaitingQueueSize: 0,
KVCacheUsagePercent: 0.2,
Expand All @@ -53,6 +55,9 @@ var (
Name: "pod2",
},
},
}
pod2WithMetrics = &datastore.PodMetrics{
Pod: pod2.Pod,
Metrics: datastore.Metrics{
WaitingQueueSize: 1,
KVCacheUsagePercent: 0.2,
Expand All @@ -69,35 +74,30 @@ func TestProvider(t *testing.T) {
tests := []struct {
name string
pmc PodMetricsClient
datastore datastore.Datastore
storePods []*datastore.PodMetrics
want []*datastore.PodMetrics
}{
{
name: "Probing metrics success",
pmc: &FakePodMetricsClient{
Res: map[types.NamespacedName]*datastore.PodMetrics{
pod1.NamespacedName: pod1,
pod2.NamespacedName: pod2,
pod1.NamespacedName: pod1WithMetrics,
pod2.NamespacedName: pod2WithMetrics,
},
},
datastore: datastore.NewFakeDatastore(populateMap(pod1, pod2), nil, nil),
want: []*datastore.PodMetrics{
pod1,
pod2,
},
storePods: []*datastore.PodMetrics{pod1, pod2},
want: []*datastore.PodMetrics{pod1WithMetrics, pod2WithMetrics},
},
{
name: "Only pods in the datastore are probed",
pmc: &FakePodMetricsClient{
Res: map[types.NamespacedName]*datastore.PodMetrics{
pod1.NamespacedName: pod1,
pod2.NamespacedName: pod2,
pod1.NamespacedName: pod1WithMetrics,
pod2.NamespacedName: pod2WithMetrics,
},
},
datastore: datastore.NewFakeDatastore(populateMap(pod1), nil, nil),
want: []*datastore.PodMetrics{
pod1,
},
storePods: []*datastore.PodMetrics{pod1},
want: []*datastore.PodMetrics{pod1WithMetrics},
},
{
name: "Probing metrics error",
Expand All @@ -106,13 +106,12 @@ func TestProvider(t *testing.T) {
pod2.NamespacedName: errors.New("injected error"),
},
Res: map[types.NamespacedName]*datastore.PodMetrics{
pod1.NamespacedName: pod1,
pod1.NamespacedName: pod1WithMetrics,
},
},
datastore: datastore.NewFakeDatastore(populateMap(pod1, pod2), nil, nil),

storePods: []*datastore.PodMetrics{pod1, pod2},
want: []*datastore.PodMetrics{
pod1,
pod1WithMetrics,
// Failed to fetch pod2 metrics so it remains the default values.
{
Pod: datastore.Pod{NamespacedName: pod2.NamespacedName},
Expand All @@ -128,12 +127,13 @@ func TestProvider(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
p := NewProvider(test.pmc, test.datastore)
ds := datastore.NewFakeDatastore(test.storePods, nil, nil)
p := NewProvider(test.pmc, ds)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
_ = p.Init(ctx, time.Millisecond, time.Millisecond)
assert.EventuallyWithT(t, func(t *assert.CollectT) {
metrics := test.datastore.PodGetAll()
metrics := ds.PodGetAll()
diff := cmp.Diff(test.want, metrics, cmpopts.SortSlices(func(a, b *datastore.PodMetrics) bool {
return a.String() < b.String()
}))
Expand All @@ -142,11 +142,3 @@ func TestProvider(t *testing.T) {
})
}
}

func populateMap(pods ...*datastore.PodMetrics) *sync.Map {
newMap := &sync.Map{}
for _, pod := range pods {
newMap.Store(pod.NamespacedName, &datastore.PodMetrics{Pod: datastore.Pod{NamespacedName: pod.NamespacedName, Address: pod.Address}})
}
return newMap
}
86 changes: 61 additions & 25 deletions pkg/epp/controller/inferencemodel_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ package controller

import (
"context"
"fmt"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -43,44 +43,80 @@ type InferenceModelReconciler struct {
}

func (c *InferenceModelReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
loggerDefault := logger.V(logutil.DEFAULT)
loggerDefault.Info("Reconciling InferenceModel", "name", req.NamespacedName)
if req.Namespace != c.PoolNamespacedName.Namespace {
return ctrl.Result{}, nil
}
logger := log.FromContext(ctx).V(logutil.DEFAULT).WithValues("inferenceModel", req.Name)
ctx = ctrl.LoggerInto(ctx, logger)

logger.Info("Reconciling InferenceModel")

infModel := &v1alpha2.InferenceModel{}
notFound := false
if err := c.Get(ctx, req.NamespacedName, infModel); err != nil {
if errors.IsNotFound(err) {
loggerDefault.Info("InferenceModel not found. Removing from datastore since object must be deleted", "name", req.NamespacedName)
c.Datastore.ModelDelete(infModel.Spec.ModelName)
return ctrl.Result{}, nil
if !errors.IsNotFound(err) {
logger.Error(err, "Unable to get InferenceModel")
return ctrl.Result{}, err
}
loggerDefault.Error(err, "Unable to get InferenceModel", "name", req.NamespacedName)
notFound = true
}

if notFound || !infModel.DeletionTimestamp.IsZero() || infModel.Spec.PoolRef.Name != c.PoolNamespacedName.Name {
Copy link
Contributor Author

@ahg-g ahg-g Feb 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

modelName is currently immutable mutable, but we plan to make it mutable immutable, and so here I didn't address the case where the infModel changes modelName here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a use case about mutating modelName

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make it immutable to guarantee that it doesn't change, otherwise the epp would behave in an unpredictable way.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

modelName is currently immutable, but we plan to make it mutable

I think the intended statement was:

modelName is currently mutable, but we plan to make it immutable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ops, yes :)

I opened #408 to track the work to make it immutable.

// InferenceModel object got deleted or changed the referenced pool.
err := c.handleModelDeleted(ctx, req.NamespacedName)
return ctrl.Result{}, err
} else if !infModel.DeletionTimestamp.IsZero() {
loggerDefault.Info("InferenceModel is marked for deletion. Removing from datastore", "name", req.NamespacedName)
c.Datastore.ModelDelete(infModel.Spec.ModelName)
return ctrl.Result{}, nil
}

c.updateDatastore(logger, infModel)
// Add or update if the InferenceModel instance has a creation timestamp older than the existing entry of the model.
logger = logger.WithValues("poolRef", infModel.Spec.PoolRef).WithValues("modelName", infModel.Spec.ModelName)
if !c.Datastore.ModelSetIfOlder(infModel) {
logger.Info("Skipping InferenceModel, existing instance has older creation timestamp")

}
logger.Info("Added/Updated InferenceModel")

return ctrl.Result{}, nil
}

func (c *InferenceModelReconciler) updateDatastore(logger logr.Logger, infModel *v1alpha2.InferenceModel) {
loggerDefault := logger.V(logutil.DEFAULT)
func (c *InferenceModelReconciler) handleModelDeleted(ctx context.Context, req types.NamespacedName) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this func here is what causes the need for the dual dictionary.

I could see a high amount of adapter churn, but that will just augment the existing infModel, I'm not sure there will be a high amount of infModel churn. What do you think about paying the O(n) cost for now, with the benefit being a simpler, and less error prone code base? The double dict makes me worry.

Points in favor:

  • Only take the O(n) hit on deletes
  • Reconciliation is on a separate go routine so wont hang EPP
  • This is for a delete, so a bit longer latency is not impacting a service (activating a duplicate is kind of a bandaid for the short term anyway, when we have a proper controller it would just flip one to an Accepted state and that event would get picked up by this reconcilier.)

Copy link
Contributor Author

@ahg-g ahg-g Feb 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, I debated this as well, I wanted to address modelName updates where we would need to do a lookup in the cache by object name every time to check if a modelName change actually happened, but I believe we should make the modelName immutable and so we don't need to handle this case here.

I didn't use sync.Map because in ModelSetIfOlder we need to do an atomic lookup+store, we also need to list and replace under the lock (similar to the PodResync case) to avoid race conditions between updating the model from the list and having the selected replacement object deleted.

logger := log.FromContext(ctx)

if infModel.Spec.PoolRef.Name == c.PoolNamespacedName.Name {
loggerDefault.Info("Updating datastore", "poolRef", infModel.Spec.PoolRef, "serverPoolName", c.PoolNamespacedName)
loggerDefault.Info("Adding/Updating InferenceModel", "modelName", infModel.Spec.ModelName)
c.Datastore.ModelSet(infModel)
return
// We will lookup and delete the modelName associated with this object, and search for
// other instances referencing the same modelName if exist, and store the oldest in
// its place. This ensures that the InferenceModel with the oldest creation
// timestamp is active.
existing, exists := c.Datastore.ModelDelete(req)
if !exists {
// No entry exists in the first place, nothing to do.
return nil
}
logger.Info("InferenceModel removed from datastore", "poolRef", existing.Spec.PoolRef, "modelName", existing.Spec.ModelName)

// TODO(#409): replace this backfill logic with one that is based on InferenceModel Ready conditions once those are set by an external controller.
updated, err := c.Datastore.ModelResync(ctx, c.Client, existing.Spec.ModelName)
if err != nil {
return err
}
if updated {
logger.Info("Model replaced.", "modelName", existing.Spec.ModelName)
}
loggerDefault.Info("Removing/Not adding InferenceModel", "modelName", infModel.Spec.ModelName)
// If we get here. The model is not relevant to this pool, remove.
c.Datastore.ModelDelete(infModel.Spec.ModelName)
return nil
}

func (c *InferenceModelReconciler) SetupWithManager(mgr ctrl.Manager) error {
func indexInferenceModelsByModelName(obj client.Object) []string {
m, ok := obj.(*v1alpha2.InferenceModel)
if !ok {
return nil
}
return []string{m.Spec.ModelName}
}

func (c *InferenceModelReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
// Create an index on ModelName for InferenceModel objects.
indexer := mgr.GetFieldIndexer()
if err := indexer.IndexField(ctx, &v1alpha2.InferenceModel{}, datastore.ModelNameIndexKey, indexInferenceModelsByModelName); err != nil {
return fmt.Errorf("setting index on ModelName for InferenceModel: %w", err)
}
return ctrl.NewControllerManagedBy(mgr).
For(&v1alpha2.InferenceModel{}).
WithEventFilter(predicate.Funcs{
Expand Down
Loading