Skip to content

Commit e8a9590

Browse files
committed
Fix InferenceModel deletion logic
Currently the logic tracks the models by Spec.ModelName, since this is not guaranteed to be unique within the cluser, we could run into two issues: 1) If the model name changes on the same InferenceModel object, we don't delete the original model entry in the datastore. 2) We don't enfore the semantics that the modelName with the oldest creation timestamp is retained. While the api is assuming that this is enforeced by another controller via the Ready condition, we don't yet have this controller yet, and so currently the behavior is unpredictable depending on InferenceModel events order.
1 parent 432f5ed commit e8a9590

15 files changed

+783
-537
lines changed

cmd/epp/main.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,8 @@ func run() error {
146146
return err
147147
}
148148

149+
ctx := ctrl.SetupSignalHandler()
150+
149151
// Setup runner.
150152
datastore := datastore.NewDatastore()
151153
provider := backend.NewProvider(&vllm.PodMetricsClientImpl{}, datastore)
@@ -162,7 +164,7 @@ func run() error {
162164
CertPath: *certPath,
163165
Provider: provider,
164166
}
165-
if err := serverRunner.SetupWithManager(mgr); err != nil {
167+
if err := serverRunner.SetupWithManager(ctx, mgr); err != nil {
166168
setupLog.Error(err, "Failed to setup ext-proc server")
167169
return err
168170
}
@@ -185,7 +187,7 @@ func run() error {
185187

186188
// Start the manager. This blocks until a signal is received.
187189
setupLog.Info("Controller manager starting")
188-
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
190+
if err := mgr.Start(ctx); err != nil {
189191
setupLog.Error(err, "Error starting controller manager")
190192
return err
191193
}

pkg/epp/backend/provider_test.go

Lines changed: 21 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package backend
1919
import (
2020
"context"
2121
"errors"
22-
"sync"
2322
"testing"
2423
"time"
2524

@@ -37,6 +36,9 @@ var (
3736
Name: "pod1",
3837
},
3938
},
39+
}
40+
pod1WithMetrics = &datastore.PodMetrics{
41+
Pod: pod1.Pod,
4042
Metrics: datastore.Metrics{
4143
WaitingQueueSize: 0,
4244
KVCacheUsagePercent: 0.2,
@@ -53,6 +55,9 @@ var (
5355
Name: "pod2",
5456
},
5557
},
58+
}
59+
pod2WithMetrics = &datastore.PodMetrics{
60+
Pod: pod2.Pod,
5661
Metrics: datastore.Metrics{
5762
WaitingQueueSize: 1,
5863
KVCacheUsagePercent: 0.2,
@@ -69,35 +74,30 @@ func TestProvider(t *testing.T) {
6974
tests := []struct {
7075
name string
7176
pmc PodMetricsClient
72-
datastore datastore.Datastore
77+
storePods []*datastore.PodMetrics
7378
want []*datastore.PodMetrics
7479
}{
7580
{
7681
name: "Probing metrics success",
7782
pmc: &FakePodMetricsClient{
7883
Res: map[types.NamespacedName]*datastore.PodMetrics{
79-
pod1.NamespacedName: pod1,
80-
pod2.NamespacedName: pod2,
84+
pod1.NamespacedName: pod1WithMetrics,
85+
pod2.NamespacedName: pod2WithMetrics,
8186
},
8287
},
83-
datastore: datastore.NewFakeDatastore(populateMap(pod1, pod2), nil, nil),
84-
want: []*datastore.PodMetrics{
85-
pod1,
86-
pod2,
87-
},
88+
storePods: []*datastore.PodMetrics{pod1, pod2},
89+
want: []*datastore.PodMetrics{pod1WithMetrics, pod2WithMetrics},
8890
},
8991
{
9092
name: "Only pods in the datastore are probed",
9193
pmc: &FakePodMetricsClient{
9294
Res: map[types.NamespacedName]*datastore.PodMetrics{
93-
pod1.NamespacedName: pod1,
94-
pod2.NamespacedName: pod2,
95+
pod1.NamespacedName: pod1WithMetrics,
96+
pod2.NamespacedName: pod2WithMetrics,
9597
},
9698
},
97-
datastore: datastore.NewFakeDatastore(populateMap(pod1), nil, nil),
98-
want: []*datastore.PodMetrics{
99-
pod1,
100-
},
99+
storePods: []*datastore.PodMetrics{pod1},
100+
want: []*datastore.PodMetrics{pod1WithMetrics},
101101
},
102102
{
103103
name: "Probing metrics error",
@@ -106,13 +106,12 @@ func TestProvider(t *testing.T) {
106106
pod2.NamespacedName: errors.New("injected error"),
107107
},
108108
Res: map[types.NamespacedName]*datastore.PodMetrics{
109-
pod1.NamespacedName: pod1,
109+
pod1.NamespacedName: pod1WithMetrics,
110110
},
111111
},
112-
datastore: datastore.NewFakeDatastore(populateMap(pod1, pod2), nil, nil),
113-
112+
storePods: []*datastore.PodMetrics{pod1, pod2},
114113
want: []*datastore.PodMetrics{
115-
pod1,
114+
pod1WithMetrics,
116115
// Failed to fetch pod2 metrics so it remains the default values.
117116
{
118117
Pod: datastore.Pod{NamespacedName: pod2.NamespacedName},
@@ -128,12 +127,13 @@ func TestProvider(t *testing.T) {
128127

129128
for _, test := range tests {
130129
t.Run(test.name, func(t *testing.T) {
131-
p := NewProvider(test.pmc, test.datastore)
130+
ds := datastore.NewFakeDatastore(test.storePods, nil, nil)
131+
p := NewProvider(test.pmc, ds)
132132
ctx, cancel := context.WithCancel(context.Background())
133133
defer cancel()
134134
_ = p.Init(ctx, time.Millisecond, time.Millisecond)
135135
assert.EventuallyWithT(t, func(t *assert.CollectT) {
136-
metrics := test.datastore.PodGetAll()
136+
metrics := ds.PodGetAll()
137137
diff := cmp.Diff(test.want, metrics, cmpopts.SortSlices(func(a, b *datastore.PodMetrics) bool {
138138
return a.String() < b.String()
139139
}))
@@ -142,11 +142,3 @@ func TestProvider(t *testing.T) {
142142
})
143143
}
144144
}
145-
146-
func populateMap(pods ...*datastore.PodMetrics) *sync.Map {
147-
newMap := &sync.Map{}
148-
for _, pod := range pods {
149-
newMap.Store(pod.NamespacedName, &datastore.PodMetrics{Pod: datastore.Pod{NamespacedName: pod.NamespacedName, Address: pod.Address}})
150-
}
151-
return newMap
152-
}

pkg/epp/controller/inferencemodel_reconciler.go

Lines changed: 84 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ package controller
1818

1919
import (
2020
"context"
21+
"fmt"
2122

22-
"github.com/go-logr/logr"
2323
"k8s.io/apimachinery/pkg/api/errors"
2424
"k8s.io/apimachinery/pkg/runtime"
2525
"k8s.io/apimachinery/pkg/types"
@@ -32,6 +32,10 @@ import (
3232
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
3333
)
3434

35+
const (
36+
modelNameKey = "spec.modelName"
37+
)
38+
3539
type InferenceModelReconciler struct {
3640
client.Client
3741
Scheme *runtime.Scheme
@@ -44,45 +48,100 @@ func (c *InferenceModelReconciler) Reconcile(ctx context.Context, req ctrl.Reque
4448
if req.Namespace != c.PoolNamespacedName.Namespace {
4549
return ctrl.Result{}, nil
4650
}
51+
logger := log.FromContext(ctx).V(logutil.DEFAULT).WithValues("inferenceModel", req.Name)
52+
ctx = ctrl.LoggerInto(ctx, logger)
4753

48-
logger := log.FromContext(ctx)
49-
loggerDefault := logger.V(logutil.DEFAULT)
50-
loggerDefault.Info("Reconciling InferenceModel", "name", req.NamespacedName)
54+
logger.Info("Reconciling InferenceModel")
5155

5256
infModel := &v1alpha1.InferenceModel{}
57+
notFound := false
5358
if err := c.Get(ctx, req.NamespacedName, infModel); err != nil {
54-
if errors.IsNotFound(err) {
55-
loggerDefault.Info("InferenceModel not found. Removing from datastore since object must be deleted", "name", req.NamespacedName)
56-
c.Datastore.ModelDelete(infModel.Spec.ModelName)
57-
return ctrl.Result{}, nil
59+
if !errors.IsNotFound(err) {
60+
logger.Error(err, "Unable to get InferenceModel")
61+
return ctrl.Result{}, err
5862
}
59-
loggerDefault.Error(err, "Unable to get InferenceModel", "name", req.NamespacedName)
63+
notFound = true
64+
}
65+
66+
if notFound || !infModel.DeletionTimestamp.IsZero() || infModel.Spec.PoolRef.Name != c.PoolNamespacedName.Name {
67+
// InferenceModel object got deleted or changed the referenced pool.
68+
err := c.handleModelDeleted(ctx, req.NamespacedName)
6069
return ctrl.Result{}, err
61-
} else if !infModel.DeletionTimestamp.IsZero() {
62-
loggerDefault.Info("InferenceModel is marked for deletion. Removing from datastore", "name", req.NamespacedName)
63-
c.Datastore.ModelDelete(infModel.Spec.ModelName)
64-
return ctrl.Result{}, nil
6570
}
6671

67-
c.updateDatastore(logger, infModel)
72+
// Add or update if the InferenceModel instance has a creation timestamp older than the existing entry of the model.
73+
logger = logger.WithValues("poolRef", infModel.Spec.PoolRef).WithValues("modelName", infModel.Spec.ModelName)
74+
if !c.Datastore.ModelSetIfOlder(infModel) {
75+
logger.Info("Skipping InferenceModel, existing instance has older creation timestamp")
76+
77+
}
78+
logger.Info("Added/Updated InferenceModel")
79+
6880
return ctrl.Result{}, nil
6981
}
7082

71-
func (c *InferenceModelReconciler) updateDatastore(logger logr.Logger, infModel *v1alpha1.InferenceModel) {
72-
loggerDefault := logger.V(logutil.DEFAULT)
83+
func (c *InferenceModelReconciler) handleModelDeleted(ctx context.Context, req types.NamespacedName) error {
84+
logger := log.FromContext(ctx)
85+
86+
// We will lookup the modelName assocaited with this object to search for
87+
// other instance referencing the same ModelName if exist to store the oldest in
88+
// its place. This ensures that the InferenceModel with the oldest creation
89+
// timestamp is active.
90+
existing, exists := c.Datastore.ModelGetByObjName(req)
91+
if !exists {
92+
// No entry exists in the first place, nothing to do.
93+
return nil
94+
}
95+
// Delete the internal object, it may be replaced with another version below.
96+
c.Datastore.ModelDelete(req)
97+
logger.Info("InferenceModel removed from datastore", "poolRef", existing.Spec.PoolRef, "modelName", existing.Spec.ModelName)
7398

74-
if infModel.Spec.PoolRef.Name == c.PoolNamespacedName.Name {
75-
loggerDefault.Info("Updating datastore", "poolRef", infModel.Spec.PoolRef, "serverPoolName", c.PoolNamespacedName)
76-
loggerDefault.Info("Adding/Updating InferenceModel", "modelName", infModel.Spec.ModelName)
77-
c.Datastore.ModelSet(infModel)
78-
return
99+
// List all InferenceModels with a matching ModelName.
100+
var models v1alpha1.InferenceModelList
101+
if err := c.List(ctx, &models, client.MatchingFields{modelNameKey: existing.Spec.ModelName}, client.InNamespace(c.PoolNamespacedName.Namespace)); err != nil {
102+
return fmt.Errorf("listing models that match the modelName %s: %w", existing.Spec.ModelName, err)
79103
}
80-
loggerDefault.Info("Removing/Not adding InferenceModel", "modelName", infModel.Spec.ModelName)
81-
// If we get here. The model is not relevant to this pool, remove.
82-
c.Datastore.ModelDelete(infModel.Spec.ModelName)
104+
if len(models.Items) == 0 {
105+
// No other instances of InferenceModels with this ModelName exists.
106+
return nil
107+
}
108+
109+
var oldest *v1alpha1.InferenceModel
110+
for i := range models.Items {
111+
m := &models.Items[i]
112+
if existing.Spec.ModelName != existing.Spec.ModelName || // The index should filter those out, but just in case!
113+
m.Spec.PoolRef.Name != c.PoolNamespacedName.Name || // We don't care about other pools, we could setup an index on this too!
114+
m.Name == existing.Name { // We don't care about the same object, it could be in the list if it was only marked for deletion, but not yet deleted.
115+
continue
116+
}
117+
if oldest == nil || m.ObjectMeta.CreationTimestamp.Before(&oldest.ObjectMeta.CreationTimestamp) {
118+
oldest = m
119+
}
120+
}
121+
if oldest != nil && c.Datastore.ModelSetIfOlder(oldest) {
122+
logger.Info("InferenceModel replaced.",
123+
"poolRef", oldest.Spec.PoolRef,
124+
"modelName", oldest.Spec.ModelName,
125+
"newInferenceModel", types.NamespacedName{Name: oldest.Name, Namespace: oldest.Namespace})
126+
}
127+
128+
return nil
83129
}
84130

85-
func (c *InferenceModelReconciler) SetupWithManager(mgr ctrl.Manager) error {
131+
func indexInferenceModelsByModelName(obj client.Object) []string {
132+
m, ok := obj.(*v1alpha1.InferenceModel)
133+
if !ok {
134+
return nil
135+
}
136+
return []string{string(m.Spec.ModelName)}
137+
}
138+
139+
func (c *InferenceModelReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
140+
// Create an index on ModelName for InferenceModel objects.
141+
indexer := mgr.GetFieldIndexer()
142+
if err := indexer.IndexField(ctx, &v1alpha1.InferenceModel{}, modelNameKey, indexInferenceModelsByModelName); err != nil {
143+
return fmt.Errorf("setting index on ModelName for InferenceModel: %w", err)
144+
}
86145
return ctrl.NewControllerManagedBy(mgr).
87146
For(&v1alpha1.InferenceModel{}).
88147
Complete(c)

0 commit comments

Comments
 (0)