Skip to content

Commit a807b40

Browse files
committed
Currently the logic tracks the models by Spec.ModelName, since this is not guaranteed to be unique within the cluster, we could run into two issues:
1) If the model name changes on the same InferenceModel object, we don't delete the original model entry in the datastore. 2) We don't enforce the semantics that the modelName with the oldest creation timestamp is retained. While the api is assuming that this is enforced by another controller via the Ready condition, we don't have this controller yet, and so currently the behavior is unpredictable depending on InferenceModel events order. To address the above, the PR makes changes to both the InferenceModel reconciler and the Model APIs in the datastore to ensure thread safe updates of the entries. In the store, the sync.Map was replaced with two maps to track the InferenceModel entries by both ModelName and InferenceModel object NamespacedName. This is needed to properly handle deletions when the object doesn't exist anymore (could be handled in other ways, but this seemed like a reasonable approach). The PR increases the datastore pkg unit test coverage the Pool and Model APIs. We still need to followup with adding unit test coverage to the pods APIs, which is currently non-existent.
1 parent 2ad70e3 commit a807b40

16 files changed

+784
-542
lines changed

cmd/epp/main.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,8 @@ func run() error {
149149
return err
150150
}
151151

152+
ctx := ctrl.SetupSignalHandler()
153+
152154
// Setup runner.
153155
datastore := datastore.NewDatastore()
154156
provider := backend.NewProvider(&vllm.PodMetricsClientImpl{}, datastore)
@@ -165,7 +167,7 @@ func run() error {
165167
CertPath: *certPath,
166168
Provider: provider,
167169
}
168-
if err := serverRunner.SetupWithManager(mgr); err != nil {
170+
if err := serverRunner.SetupWithManager(ctx, mgr); err != nil {
169171
setupLog.Error(err, "Failed to setup ext-proc controllers")
170172
return err
171173
}
@@ -188,7 +190,7 @@ func run() error {
188190

189191
// Start the manager. This blocks until a signal is received.
190192
setupLog.Info("Controller manager starting")
191-
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
193+
if err := mgr.Start(ctx); err != nil {
192194
setupLog.Error(err, "Error starting controller manager")
193195
return err
194196
}

pkg/epp/backend/provider_test.go

Lines changed: 21 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package backend
1919
import (
2020
"context"
2121
"errors"
22-
"sync"
2322
"testing"
2423
"time"
2524

@@ -37,6 +36,9 @@ var (
3736
Name: "pod1",
3837
},
3938
},
39+
}
40+
pod1WithMetrics = &datastore.PodMetrics{
41+
Pod: pod1.Pod,
4042
Metrics: datastore.Metrics{
4143
WaitingQueueSize: 0,
4244
KVCacheUsagePercent: 0.2,
@@ -53,6 +55,9 @@ var (
5355
Name: "pod2",
5456
},
5557
},
58+
}
59+
pod2WithMetrics = &datastore.PodMetrics{
60+
Pod: pod2.Pod,
5661
Metrics: datastore.Metrics{
5762
WaitingQueueSize: 1,
5863
KVCacheUsagePercent: 0.2,
@@ -69,35 +74,30 @@ func TestProvider(t *testing.T) {
6974
tests := []struct {
7075
name string
7176
pmc PodMetricsClient
72-
datastore datastore.Datastore
77+
storePods []*datastore.PodMetrics
7378
want []*datastore.PodMetrics
7479
}{
7580
{
7681
name: "Probing metrics success",
7782
pmc: &FakePodMetricsClient{
7883
Res: map[types.NamespacedName]*datastore.PodMetrics{
79-
pod1.NamespacedName: pod1,
80-
pod2.NamespacedName: pod2,
84+
pod1.NamespacedName: pod1WithMetrics,
85+
pod2.NamespacedName: pod2WithMetrics,
8186
},
8287
},
83-
datastore: datastore.NewFakeDatastore(populateMap(pod1, pod2), nil, nil),
84-
want: []*datastore.PodMetrics{
85-
pod1,
86-
pod2,
87-
},
88+
storePods: []*datastore.PodMetrics{pod1, pod2},
89+
want: []*datastore.PodMetrics{pod1WithMetrics, pod2WithMetrics},
8890
},
8991
{
9092
name: "Only pods in the datastore are probed",
9193
pmc: &FakePodMetricsClient{
9294
Res: map[types.NamespacedName]*datastore.PodMetrics{
93-
pod1.NamespacedName: pod1,
94-
pod2.NamespacedName: pod2,
95+
pod1.NamespacedName: pod1WithMetrics,
96+
pod2.NamespacedName: pod2WithMetrics,
9597
},
9698
},
97-
datastore: datastore.NewFakeDatastore(populateMap(pod1), nil, nil),
98-
want: []*datastore.PodMetrics{
99-
pod1,
100-
},
99+
storePods: []*datastore.PodMetrics{pod1},
100+
want: []*datastore.PodMetrics{pod1WithMetrics},
101101
},
102102
{
103103
name: "Probing metrics error",
@@ -106,13 +106,12 @@ func TestProvider(t *testing.T) {
106106
pod2.NamespacedName: errors.New("injected error"),
107107
},
108108
Res: map[types.NamespacedName]*datastore.PodMetrics{
109-
pod1.NamespacedName: pod1,
109+
pod1.NamespacedName: pod1WithMetrics,
110110
},
111111
},
112-
datastore: datastore.NewFakeDatastore(populateMap(pod1, pod2), nil, nil),
113-
112+
storePods: []*datastore.PodMetrics{pod1, pod2},
114113
want: []*datastore.PodMetrics{
115-
pod1,
114+
pod1WithMetrics,
116115
// Failed to fetch pod2 metrics so it remains the default values.
117116
{
118117
Pod: datastore.Pod{NamespacedName: pod2.NamespacedName},
@@ -128,12 +127,13 @@ func TestProvider(t *testing.T) {
128127

129128
for _, test := range tests {
130129
t.Run(test.name, func(t *testing.T) {
131-
p := NewProvider(test.pmc, test.datastore)
130+
ds := datastore.NewFakeDatastore(test.storePods, nil, nil)
131+
p := NewProvider(test.pmc, ds)
132132
ctx, cancel := context.WithCancel(context.Background())
133133
defer cancel()
134134
_ = p.Init(ctx, time.Millisecond, time.Millisecond)
135135
assert.EventuallyWithT(t, func(t *assert.CollectT) {
136-
metrics := test.datastore.PodGetAll()
136+
metrics := ds.PodGetAll()
137137
diff := cmp.Diff(test.want, metrics, cmpopts.SortSlices(func(a, b *datastore.PodMetrics) bool {
138138
return a.String() < b.String()
139139
}))
@@ -142,11 +142,3 @@ func TestProvider(t *testing.T) {
142142
})
143143
}
144144
}
145-
146-
func populateMap(pods ...*datastore.PodMetrics) *sync.Map {
147-
newMap := &sync.Map{}
148-
for _, pod := range pods {
149-
newMap.Store(pod.NamespacedName, &datastore.PodMetrics{Pod: datastore.Pod{NamespacedName: pod.NamespacedName, Address: pod.Address}})
150-
}
151-
return newMap
152-
}

pkg/epp/controller/inferencemodel_reconciler.go

Lines changed: 88 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ package controller
1818

1919
import (
2020
"context"
21+
"fmt"
2122

22-
"github.com/go-logr/logr"
2323
"k8s.io/apimachinery/pkg/api/errors"
2424
"k8s.io/apimachinery/pkg/runtime"
2525
"k8s.io/apimachinery/pkg/types"
@@ -34,6 +34,10 @@ import (
3434
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
3535
)
3636

37+
const (
38+
modelNameKey = "spec.modelName"
39+
)
40+
3741
type InferenceModelReconciler struct {
3842
client.Client
3943
Scheme *runtime.Scheme
@@ -43,44 +47,103 @@ type InferenceModelReconciler struct {
4347
}
4448

4549
func (c *InferenceModelReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
46-
logger := log.FromContext(ctx)
47-
loggerDefault := logger.V(logutil.DEFAULT)
48-
loggerDefault.Info("Reconciling InferenceModel", "name", req.NamespacedName)
50+
if req.Namespace != c.PoolNamespacedName.Namespace {
51+
return ctrl.Result{}, nil
52+
}
53+
logger := log.FromContext(ctx).V(logutil.DEFAULT).WithValues("inferenceModel", req.Name)
54+
ctx = ctrl.LoggerInto(ctx, logger)
55+
56+
logger.Info("Reconciling InferenceModel")
4957

5058
infModel := &v1alpha2.InferenceModel{}
59+
notFound := false
5160
if err := c.Get(ctx, req.NamespacedName, infModel); err != nil {
52-
if errors.IsNotFound(err) {
53-
loggerDefault.Info("InferenceModel not found. Removing from datastore since object must be deleted", "name", req.NamespacedName)
54-
c.Datastore.ModelDelete(infModel.Spec.ModelName)
55-
return ctrl.Result{}, nil
61+
if !errors.IsNotFound(err) {
62+
logger.Error(err, "Unable to get InferenceModel")
63+
return ctrl.Result{}, err
5664
}
57-
loggerDefault.Error(err, "Unable to get InferenceModel", "name", req.NamespacedName)
65+
notFound = true
66+
}
67+
68+
if notFound || !infModel.DeletionTimestamp.IsZero() || infModel.Spec.PoolRef.Name != c.PoolNamespacedName.Name {
69+
// InferenceModel object got deleted or changed the referenced pool.
70+
err := c.handleModelDeleted(ctx, req.NamespacedName)
5871
return ctrl.Result{}, err
59-
} else if !infModel.DeletionTimestamp.IsZero() {
60-
loggerDefault.Info("InferenceModel is marked for deletion. Removing from datastore", "name", req.NamespacedName)
61-
c.Datastore.ModelDelete(infModel.Spec.ModelName)
62-
return ctrl.Result{}, nil
6372
}
6473

65-
c.updateDatastore(logger, infModel)
74+
// Add or update if the InferenceModel instance has a creation timestamp older than the existing entry of the model.
75+
logger = logger.WithValues("poolRef", infModel.Spec.PoolRef).WithValues("modelName", infModel.Spec.ModelName)
76+
if !c.Datastore.ModelSetIfOlder(infModel) {
77+
logger.Info("Skipping InferenceModel, existing instance has older creation timestamp")
78+
79+
}
80+
logger.Info("Added/Updated InferenceModel")
81+
6682
return ctrl.Result{}, nil
6783
}
6884

69-
func (c *InferenceModelReconciler) updateDatastore(logger logr.Logger, infModel *v1alpha2.InferenceModel) {
70-
loggerDefault := logger.V(logutil.DEFAULT)
85+
func (c *InferenceModelReconciler) handleModelDeleted(ctx context.Context, req types.NamespacedName) error {
86+
logger := log.FromContext(ctx)
87+
88+
// We will lookup the modelName associated with this object to search for
89+
// other instance referencing the same ModelName if exist to store the oldest in
90+
// its place. This ensures that the InferenceModel with the oldest creation
91+
// timestamp is active.
92+
existing, exists := c.Datastore.ModelGetByObjName(req)
93+
if !exists {
94+
// No entry exists in the first place, nothing to do.
95+
return nil
96+
}
97+
// Delete the internal object, it may be replaced with another version below.
98+
c.Datastore.ModelDelete(req)
99+
logger.Info("InferenceModel removed from datastore", "poolRef", existing.Spec.PoolRef, "modelName", existing.Spec.ModelName)
71100

72-
if infModel.Spec.PoolRef.Name == c.PoolNamespacedName.Name {
73-
loggerDefault.Info("Updating datastore", "poolRef", infModel.Spec.PoolRef, "serverPoolName", c.PoolNamespacedName)
74-
loggerDefault.Info("Adding/Updating InferenceModel", "modelName", infModel.Spec.ModelName)
75-
c.Datastore.ModelSet(infModel)
76-
return
101+
// List all InferenceModels with a matching ModelName.
102+
var models v1alpha2.InferenceModelList
103+
if err := c.List(ctx, &models, client.MatchingFields{modelNameKey: existing.Spec.ModelName}, client.InNamespace(c.PoolNamespacedName.Namespace)); err != nil {
104+
return fmt.Errorf("listing models that match the modelName %s: %w", existing.Spec.ModelName, err)
105+
}
106+
if len(models.Items) == 0 {
107+
// No other instances of InferenceModels with this ModelName exists.
108+
return nil
109+
}
110+
111+
var oldest *v1alpha2.InferenceModel
112+
for i := range models.Items {
113+
m := &models.Items[i]
114+
if m.Spec.ModelName != existing.Spec.ModelName || // The index should filter those out, but just in case!
115+
m.Spec.PoolRef.Name != c.PoolNamespacedName.Name || // We don't care about other pools, we could setup an index on this too!
116+
m.Name == existing.Name { // We don't care about the same object, it could be in the list if it was only marked for deletion, but not yet deleted.
117+
continue
118+
}
119+
if oldest == nil || m.ObjectMeta.CreationTimestamp.Before(&oldest.ObjectMeta.CreationTimestamp) {
120+
oldest = m
121+
}
77122
}
78-
loggerDefault.Info("Removing/Not adding InferenceModel", "modelName", infModel.Spec.ModelName)
79-
// If we get here. The model is not relevant to this pool, remove.
80-
c.Datastore.ModelDelete(infModel.Spec.ModelName)
123+
if oldest != nil && c.Datastore.ModelSetIfOlder(oldest) {
124+
logger.Info("InferenceModel replaced.",
125+
"poolRef", oldest.Spec.PoolRef,
126+
"modelName", oldest.Spec.ModelName,
127+
"newInferenceModel", types.NamespacedName{Name: oldest.Name, Namespace: oldest.Namespace})
128+
}
129+
130+
return nil
131+
}
132+
133+
func indexInferenceModelsByModelName(obj client.Object) []string {
134+
m, ok := obj.(*v1alpha2.InferenceModel)
135+
if !ok {
136+
return nil
137+
}
138+
return []string{m.Spec.ModelName}
81139
}
82140

83-
func (c *InferenceModelReconciler) SetupWithManager(mgr ctrl.Manager) error {
141+
func (c *InferenceModelReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
142+
// Create an index on ModelName for InferenceModel objects.
143+
indexer := mgr.GetFieldIndexer()
144+
if err := indexer.IndexField(ctx, &v1alpha2.InferenceModel{}, modelNameKey, indexInferenceModelsByModelName); err != nil {
145+
return fmt.Errorf("setting index on ModelName for InferenceModel: %w", err)
146+
}
84147
return ctrl.NewControllerManagedBy(mgr).
85148
For(&v1alpha2.InferenceModel{}).
86149
WithEventFilter(predicate.Funcs{

0 commit comments

Comments
 (0)