Skip to content

Commit 02aa4c9

Browse files
committed
Added ModelResync to handle a race condition
1 parent f307bc5 commit 02aa4c9

File tree

4 files changed

+49
-51
lines changed

4 files changed

+49
-51
lines changed

pkg/epp/controller/inferencemodel_reconciler.go

Lines changed: 6 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,6 @@ import (
3434
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
3535
)
3636

37-
const (
38-
modelNameKey = "spec.modelName"
39-
)
40-
4137
type InferenceModelReconciler struct {
4238
client.Client
4339
Scheme *runtime.Scheme
@@ -96,35 +92,13 @@ func (c *InferenceModelReconciler) handleModelDeleted(ctx context.Context, req t
9692
}
9793
logger.Info("InferenceModel removed from datastore", "poolRef", existing.Spec.PoolRef, "modelName", existing.Spec.ModelName)
9894

99-
// List all InferenceModels with a matching ModelName.
100-
var models v1alpha2.InferenceModelList
101-
if err := c.List(ctx, &models, client.MatchingFields{modelNameKey: existing.Spec.ModelName}, client.InNamespace(c.PoolNamespacedName.Namespace)); err != nil {
102-
return fmt.Errorf("listing models that match the modelName %s: %w", existing.Spec.ModelName, err)
95+
updated, err := c.Datastore.ModelResync(ctx, c.Client, existing.Spec.ModelName)
96+
if err != nil {
97+
return err
10398
}
104-
if len(models.Items) == 0 {
105-
// No other instances of InferenceModels with this ModelName exists.
106-
return nil
99+
if updated {
100+
logger.Info("Model replaced.", "modelName", existing.Spec.ModelName)
107101
}
108-
109-
var oldest *v1alpha2.InferenceModel
110-
for i := range models.Items {
111-
m := &models.Items[i]
112-
if m.Spec.ModelName != existing.Spec.ModelName || // The index should filter those out, but just in case!
113-
m.Spec.PoolRef.Name != c.PoolNamespacedName.Name || // We don't care about other pools, we could setup an index on this too!
114-
m.Name == existing.Name { // We don't care about the same object, it could be in the list if it was only marked for deletion, but not yet deleted.
115-
continue
116-
}
117-
if oldest == nil || m.ObjectMeta.CreationTimestamp.Before(&oldest.ObjectMeta.CreationTimestamp) {
118-
oldest = m
119-
}
120-
}
121-
if oldest != nil && c.Datastore.ModelSetIfOlder(oldest) {
122-
logger.Info("InferenceModel replaced.",
123-
"poolRef", oldest.Spec.PoolRef,
124-
"modelName", oldest.Spec.ModelName,
125-
"newInferenceModel", types.NamespacedName{Name: oldest.Name, Namespace: oldest.Namespace})
126-
}
127-
128102
return nil
129103
}
130104

@@ -139,7 +113,7 @@ func indexInferenceModelsByModelName(obj client.Object) []string {
139113
func (c *InferenceModelReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
140114
// Create an index on ModelName for InferenceModel objects.
141115
indexer := mgr.GetFieldIndexer()
142-
if err := indexer.IndexField(ctx, &v1alpha2.InferenceModel{}, modelNameKey, indexInferenceModelsByModelName); err != nil {
116+
if err := indexer.IndexField(ctx, &v1alpha2.InferenceModel{}, datastore.ModelNameIndexKey, indexInferenceModelsByModelName); err != nil {
143117
return fmt.Errorf("setting index on ModelName for InferenceModel: %w", err)
144118
}
145119
return ctrl.NewControllerManagedBy(mgr).

pkg/epp/controller/inferencemodel_reconciler_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ func TestInferenceModelReconciler(t *testing.T) {
198198
fakeClient := fake.NewClientBuilder().
199199
WithScheme(scheme).
200200
WithObjects(initObjs...).
201-
WithIndex(&v1alpha2.InferenceModel{}, modelNameKey, indexInferenceModelsByModelName).
201+
WithIndex(&v1alpha2.InferenceModel{}, datastore.ModelNameIndexKey, indexInferenceModelsByModelName).
202202
Build()
203203

204204
datastore := datastore.NewFakeDatastore(nil, test.modelsInStore, pool)

pkg/epp/datastore/datastore.go

Lines changed: 42 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package datastore
1919
import (
2020
"context"
2121
"errors"
22+
"fmt"
2223
"math/rand"
2324
"sync"
2425

@@ -32,6 +33,10 @@ import (
3233
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
3334
)
3435

36+
const (
37+
ModelNameIndexKey = "spec.modelName"
38+
)
39+
3540
var (
3641
errPoolNotSynced = errors.New("InferencePool is not initialized in data store")
3742
)
@@ -48,6 +53,7 @@ type Datastore interface {
4853
ModelSetIfOlder(infModel *v1alpha2.InferenceModel) bool
4954
ModelGet(modelName string) (*v1alpha2.InferenceModel, bool)
5055
ModelDelete(namespacedName types.NamespacedName) (*v1alpha2.InferenceModel, bool)
56+
ModelResync(ctx context.Context, ctrlClient client.Client, modelName string) (bool, error)
5157
ModelGetAll() []*v1alpha2.InferenceModel
5258

5359
// PodMetrics operations
@@ -156,20 +162,53 @@ func (ds *datastore) ModelSetIfOlder(infModel *v1alpha2.InferenceModel) bool {
156162
return false
157163
}
158164
}
159-
// Delete the model
160-
ds.modelDelete(types.NamespacedName{Name: infModel.Name, Namespace: infModel.Namespace})
165+
// Set the model.
161166
ds.models[infModel.Spec.ModelName] = infModel
162167
return true
163168
}
164169

170+
func (ds *datastore) ModelResync(ctx context.Context, c client.Client, modelName string) (bool, error) {
171+
ds.poolAndModelsMu.Lock()
172+
defer ds.poolAndModelsMu.Unlock()
173+
174+
var models v1alpha2.InferenceModelList
175+
if err := c.List(ctx, &models, client.MatchingFields{ModelNameIndexKey: modelName}, client.InNamespace(ds.pool.Namespace)); err != nil {
176+
return false, fmt.Errorf("listing models that match the modelName %s: %w", modelName, err)
177+
}
178+
if len(models.Items) == 0 {
179+
// No other instances of InferenceModels with this ModelName exists.
180+
return false, nil
181+
}
182+
183+
var oldest *v1alpha2.InferenceModel
184+
for i := range models.Items {
185+
m := &models.Items[i]
186+
if m.Spec.ModelName != modelName || // The index should filter those out, but just in case!
187+
m.Spec.PoolRef.Name != ds.pool.Name || // We don't care about other pools, we could setup an index on this too!
188+
!m.DeletionTimestamp.IsZero() { // ignore objects marked for deletion
189+
continue
190+
}
191+
if oldest == nil || m.ObjectMeta.CreationTimestamp.Before(&oldest.ObjectMeta.CreationTimestamp) {
192+
oldest = m
193+
}
194+
}
195+
if oldest == nil {
196+
return false, nil
197+
}
198+
ds.models[modelName] = oldest
199+
return true, nil
200+
}
201+
165202
func (ds *datastore) ModelGet(modelName string) (*v1alpha2.InferenceModel, bool) {
166203
ds.poolAndModelsMu.RLock()
167204
defer ds.poolAndModelsMu.RUnlock()
168205
m, exists := ds.models[modelName]
169206
return m, exists
170207
}
171208

172-
func (ds *datastore) modelDelete(namespacedName types.NamespacedName) (*v1alpha2.InferenceModel, bool) {
209+
func (ds *datastore) ModelDelete(namespacedName types.NamespacedName) (*v1alpha2.InferenceModel, bool) {
210+
ds.poolAndModelsMu.Lock()
211+
defer ds.poolAndModelsMu.Unlock()
173212
for _, m := range ds.models {
174213
if m.Name == namespacedName.Name && m.Namespace == namespacedName.Namespace {
175214
delete(ds.models, m.Spec.ModelName)
@@ -179,12 +218,6 @@ func (ds *datastore) modelDelete(namespacedName types.NamespacedName) (*v1alpha2
179218
return nil, false
180219
}
181220

182-
func (ds *datastore) ModelDelete(namespacedName types.NamespacedName) (*v1alpha2.InferenceModel, bool) {
183-
ds.poolAndModelsMu.Lock()
184-
defer ds.poolAndModelsMu.Unlock()
185-
return ds.modelDelete(namespacedName)
186-
}
187-
188221
func (ds *datastore) ModelGetAll() []*v1alpha2.InferenceModel {
189222
ds.poolAndModelsMu.RLock()
190223
defer ds.poolAndModelsMu.RUnlock()

pkg/epp/datastore/datastore_test.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -154,15 +154,6 @@ func TestModel(t *testing.T) {
154154
wantOpResult: true,
155155
wantModels: []*v1alpha2.InferenceModel{model2ts},
156156
},
157-
{
158-
name: "Set model2 updated with a new modelName, should update modelName",
159-
existingModels: []*v1alpha2.InferenceModel{model2ts},
160-
op: func(ds Datastore) bool {
161-
return ds.ModelSetIfOlder(model2chat)
162-
},
163-
wantOpResult: true,
164-
wantModels: []*v1alpha2.InferenceModel{model2chat},
165-
},
166157
{
167158
name: "Set model1 with the tweet-summary modelName, both models should exist",
168159
existingModels: []*v1alpha2.InferenceModel{model2chat},

0 commit comments

Comments
 (0)