Skip to content

Commit d8794df

Browse files
ahg-gkfswain
authored andcommitted
Fix InferenceModel deletion logic (kubernetes-sigs#393)
* Currently the logic tracks the models by Spec.ModelName, since this is not guaranteed to be unique within the cluster, we could run into two issues: 1) If the model name changes on the same InferenceModel object, we don't delete the original model entry in the datastore. 2) We don't enforce the semantics that the modelName with the oldest creation timestamp is retained. While the api is assuming that this is enforced by another controller via the Ready condition, we don't have this controller yet, and so currently the behavior is unpredictable depending on InferenceModel events order. To address the above, the PR makes changes to both the InferenceModel reconciler and the Model APIs in the datastore to ensure thread safe updates of the entries. In the store, the sync.Map was replaced with two maps to track the InferenceModel entries by both ModelName and InferenceModel object NamespacedName. This is needed to properly handle deletions when the object doesn't exist anymore (could be handled in other ways, but this seemed like a reasonable approach). The PR increases the datastore pkg unit test coverage the Pool and Model APIs. We still need to followup with adding unit test coverage to the pods APIs, which is currently non-existent. * Convert unit test to a table * remove the dual map for the models store, and rely on linear search when looking up the model by object name * Added ModelResync to handle a race condition * Update pkg/epp/controller/inferencemodel_reconciler.go
1 parent b717d95 commit d8794df

15 files changed

+789
-546
lines changed

cmd/epp/main.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,8 @@ func run() error {
149149
return err
150150
}
151151

152+
ctx := ctrl.SetupSignalHandler()
153+
152154
// Setup runner.
153155
datastore := datastore.NewDatastore()
154156
provider := backend.NewProvider(&vllm.PodMetricsClientImpl{}, datastore)
@@ -165,7 +167,7 @@ func run() error {
165167
CertPath: *certPath,
166168
Provider: provider,
167169
}
168-
if err := serverRunner.SetupWithManager(mgr); err != nil {
170+
if err := serverRunner.SetupWithManager(ctx, mgr); err != nil {
169171
setupLog.Error(err, "Failed to setup ext-proc controllers")
170172
return err
171173
}
@@ -188,7 +190,7 @@ func run() error {
188190

189191
// Start the manager. This blocks until a signal is received.
190192
setupLog.Info("Controller manager starting")
191-
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
193+
if err := mgr.Start(ctx); err != nil {
192194
setupLog.Error(err, "Error starting controller manager")
193195
return err
194196
}

pkg/epp/backend/provider_test.go

Lines changed: 21 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package backend
1919
import (
2020
"context"
2121
"errors"
22-
"sync"
2322
"testing"
2423
"time"
2524

@@ -37,6 +36,9 @@ var (
3736
Name: "pod1",
3837
},
3938
},
39+
}
40+
pod1WithMetrics = &datastore.PodMetrics{
41+
Pod: pod1.Pod,
4042
Metrics: datastore.Metrics{
4143
WaitingQueueSize: 0,
4244
KVCacheUsagePercent: 0.2,
@@ -53,6 +55,9 @@ var (
5355
Name: "pod2",
5456
},
5557
},
58+
}
59+
pod2WithMetrics = &datastore.PodMetrics{
60+
Pod: pod2.Pod,
5661
Metrics: datastore.Metrics{
5762
WaitingQueueSize: 1,
5863
KVCacheUsagePercent: 0.2,
@@ -69,35 +74,30 @@ func TestProvider(t *testing.T) {
6974
tests := []struct {
7075
name string
7176
pmc PodMetricsClient
72-
datastore datastore.Datastore
77+
storePods []*datastore.PodMetrics
7378
want []*datastore.PodMetrics
7479
}{
7580
{
7681
name: "Probing metrics success",
7782
pmc: &FakePodMetricsClient{
7883
Res: map[types.NamespacedName]*datastore.PodMetrics{
79-
pod1.NamespacedName: pod1,
80-
pod2.NamespacedName: pod2,
84+
pod1.NamespacedName: pod1WithMetrics,
85+
pod2.NamespacedName: pod2WithMetrics,
8186
},
8287
},
83-
datastore: datastore.NewFakeDatastore(populateMap(pod1, pod2), nil, nil),
84-
want: []*datastore.PodMetrics{
85-
pod1,
86-
pod2,
87-
},
88+
storePods: []*datastore.PodMetrics{pod1, pod2},
89+
want: []*datastore.PodMetrics{pod1WithMetrics, pod2WithMetrics},
8890
},
8991
{
9092
name: "Only pods in the datastore are probed",
9193
pmc: &FakePodMetricsClient{
9294
Res: map[types.NamespacedName]*datastore.PodMetrics{
93-
pod1.NamespacedName: pod1,
94-
pod2.NamespacedName: pod2,
95+
pod1.NamespacedName: pod1WithMetrics,
96+
pod2.NamespacedName: pod2WithMetrics,
9597
},
9698
},
97-
datastore: datastore.NewFakeDatastore(populateMap(pod1), nil, nil),
98-
want: []*datastore.PodMetrics{
99-
pod1,
100-
},
99+
storePods: []*datastore.PodMetrics{pod1},
100+
want: []*datastore.PodMetrics{pod1WithMetrics},
101101
},
102102
{
103103
name: "Probing metrics error",
@@ -106,13 +106,12 @@ func TestProvider(t *testing.T) {
106106
pod2.NamespacedName: errors.New("injected error"),
107107
},
108108
Res: map[types.NamespacedName]*datastore.PodMetrics{
109-
pod1.NamespacedName: pod1,
109+
pod1.NamespacedName: pod1WithMetrics,
110110
},
111111
},
112-
datastore: datastore.NewFakeDatastore(populateMap(pod1, pod2), nil, nil),
113-
112+
storePods: []*datastore.PodMetrics{pod1, pod2},
114113
want: []*datastore.PodMetrics{
115-
pod1,
114+
pod1WithMetrics,
116115
// Failed to fetch pod2 metrics so it remains the default values.
117116
{
118117
Pod: datastore.Pod{NamespacedName: pod2.NamespacedName},
@@ -128,12 +127,13 @@ func TestProvider(t *testing.T) {
128127

129128
for _, test := range tests {
130129
t.Run(test.name, func(t *testing.T) {
131-
p := NewProvider(test.pmc, test.datastore)
130+
ds := datastore.NewFakeDatastore(test.storePods, nil, nil)
131+
p := NewProvider(test.pmc, ds)
132132
ctx, cancel := context.WithCancel(context.Background())
133133
defer cancel()
134134
_ = p.Init(ctx, time.Millisecond, time.Millisecond)
135135
assert.EventuallyWithT(t, func(t *assert.CollectT) {
136-
metrics := test.datastore.PodGetAll()
136+
metrics := ds.PodGetAll()
137137
diff := cmp.Diff(test.want, metrics, cmpopts.SortSlices(func(a, b *datastore.PodMetrics) bool {
138138
return a.String() < b.String()
139139
}))
@@ -142,11 +142,3 @@ func TestProvider(t *testing.T) {
142142
})
143143
}
144144
}
145-
146-
func populateMap(pods ...*datastore.PodMetrics) *sync.Map {
147-
newMap := &sync.Map{}
148-
for _, pod := range pods {
149-
newMap.Store(pod.NamespacedName, &datastore.PodMetrics{Pod: datastore.Pod{NamespacedName: pod.NamespacedName, Address: pod.Address}})
150-
}
151-
return newMap
152-
}

pkg/epp/controller/inferencemodel_reconciler.go

Lines changed: 61 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ package controller
1818

1919
import (
2020
"context"
21+
"fmt"
2122

22-
"github.com/go-logr/logr"
2323
"k8s.io/apimachinery/pkg/api/errors"
2424
"k8s.io/apimachinery/pkg/runtime"
2525
"k8s.io/apimachinery/pkg/types"
@@ -43,44 +43,80 @@ type InferenceModelReconciler struct {
4343
}
4444

4545
func (c *InferenceModelReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
46-
logger := log.FromContext(ctx)
47-
loggerDefault := logger.V(logutil.DEFAULT)
48-
loggerDefault.Info("Reconciling InferenceModel", "name", req.NamespacedName)
46+
if req.Namespace != c.PoolNamespacedName.Namespace {
47+
return ctrl.Result{}, nil
48+
}
49+
logger := log.FromContext(ctx).V(logutil.DEFAULT).WithValues("inferenceModel", req.Name)
50+
ctx = ctrl.LoggerInto(ctx, logger)
51+
52+
logger.Info("Reconciling InferenceModel")
4953

5054
infModel := &v1alpha2.InferenceModel{}
55+
notFound := false
5156
if err := c.Get(ctx, req.NamespacedName, infModel); err != nil {
52-
if errors.IsNotFound(err) {
53-
loggerDefault.Info("InferenceModel not found. Removing from datastore since object must be deleted", "name", req.NamespacedName)
54-
c.Datastore.ModelDelete(infModel.Spec.ModelName)
55-
return ctrl.Result{}, nil
57+
if !errors.IsNotFound(err) {
58+
logger.Error(err, "Unable to get InferenceModel")
59+
return ctrl.Result{}, err
5660
}
57-
loggerDefault.Error(err, "Unable to get InferenceModel", "name", req.NamespacedName)
61+
notFound = true
62+
}
63+
64+
if notFound || !infModel.DeletionTimestamp.IsZero() || infModel.Spec.PoolRef.Name != c.PoolNamespacedName.Name {
65+
// InferenceModel object got deleted or changed the referenced pool.
66+
err := c.handleModelDeleted(ctx, req.NamespacedName)
5867
return ctrl.Result{}, err
59-
} else if !infModel.DeletionTimestamp.IsZero() {
60-
loggerDefault.Info("InferenceModel is marked for deletion. Removing from datastore", "name", req.NamespacedName)
61-
c.Datastore.ModelDelete(infModel.Spec.ModelName)
62-
return ctrl.Result{}, nil
6368
}
6469

65-
c.updateDatastore(logger, infModel)
70+
// Add or update if the InferenceModel instance has a creation timestamp older than the existing entry of the model.
71+
logger = logger.WithValues("poolRef", infModel.Spec.PoolRef).WithValues("modelName", infModel.Spec.ModelName)
72+
if !c.Datastore.ModelSetIfOlder(infModel) {
73+
logger.Info("Skipping InferenceModel, existing instance has older creation timestamp")
74+
75+
}
76+
logger.Info("Added/Updated InferenceModel")
77+
6678
return ctrl.Result{}, nil
6779
}
6880

69-
func (c *InferenceModelReconciler) updateDatastore(logger logr.Logger, infModel *v1alpha2.InferenceModel) {
70-
loggerDefault := logger.V(logutil.DEFAULT)
81+
func (c *InferenceModelReconciler) handleModelDeleted(ctx context.Context, req types.NamespacedName) error {
82+
logger := log.FromContext(ctx)
7183

72-
if infModel.Spec.PoolRef.Name == c.PoolNamespacedName.Name {
73-
loggerDefault.Info("Updating datastore", "poolRef", infModel.Spec.PoolRef, "serverPoolName", c.PoolNamespacedName)
74-
loggerDefault.Info("Adding/Updating InferenceModel", "modelName", infModel.Spec.ModelName)
75-
c.Datastore.ModelSet(infModel)
76-
return
84+
// We will lookup and delete the modelName associated with this object, and search for
85+
// other instances referencing the same modelName if exist, and store the oldest in
86+
// its place. This ensures that the InferenceModel with the oldest creation
87+
// timestamp is active.
88+
existing, exists := c.Datastore.ModelDelete(req)
89+
if !exists {
90+
// No entry exists in the first place, nothing to do.
91+
return nil
92+
}
93+
logger.Info("InferenceModel removed from datastore", "poolRef", existing.Spec.PoolRef, "modelName", existing.Spec.ModelName)
94+
95+
// TODO(#409): replace this backfill logic with one that is based on InferenceModel Ready conditions once those are set by an external controller.
96+
updated, err := c.Datastore.ModelResync(ctx, c.Client, existing.Spec.ModelName)
97+
if err != nil {
98+
return err
99+
}
100+
if updated {
101+
logger.Info("Model replaced.", "modelName", existing.Spec.ModelName)
77102
}
78-
loggerDefault.Info("Removing/Not adding InferenceModel", "modelName", infModel.Spec.ModelName)
79-
// If we get here. The model is not relevant to this pool, remove.
80-
c.Datastore.ModelDelete(infModel.Spec.ModelName)
103+
return nil
81104
}
82105

83-
func (c *InferenceModelReconciler) SetupWithManager(mgr ctrl.Manager) error {
106+
func indexInferenceModelsByModelName(obj client.Object) []string {
107+
m, ok := obj.(*v1alpha2.InferenceModel)
108+
if !ok {
109+
return nil
110+
}
111+
return []string{m.Spec.ModelName}
112+
}
113+
114+
func (c *InferenceModelReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
115+
// Create an index on ModelName for InferenceModel objects.
116+
indexer := mgr.GetFieldIndexer()
117+
if err := indexer.IndexField(ctx, &v1alpha2.InferenceModel{}, datastore.ModelNameIndexKey, indexInferenceModelsByModelName); err != nil {
118+
return fmt.Errorf("setting index on ModelName for InferenceModel: %w", err)
119+
}
84120
return ctrl.NewControllerManagedBy(mgr).
85121
For(&v1alpha2.InferenceModel{}).
86122
WithEventFilter(predicate.Funcs{

0 commit comments

Comments
 (0)