Skip to content

fix: adds ErrorNotFound Handling for InferenceModel Reconciler #286

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 20 additions & 14 deletions pkg/ext-proc/backend/inferencemodel_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
Expand All @@ -25,32 +26,37 @@ func (c *InferenceModelReconciler) Reconcile(ctx context.Context, req ctrl.Reque
if req.Namespace != c.PoolNamespacedName.Namespace {
return ctrl.Result{}, nil
}
klog.V(1).Infof("reconciling InferenceModel %v", req.NamespacedName)

service := &v1alpha1.InferenceModel{}
if err := c.Get(ctx, req.NamespacedName, service); err != nil {
klog.Error(err, "unable to get InferencePool")
klog.V(1).Infof("Reconciling InferenceModel %v", req.NamespacedName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use logging levels pls, DEFAULT in this case

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++, setting this standard makes it easier for contributors to follow, and makes it more easy to reason about what the authors intent was wrt log level.

Edit: I see some areas in this file using V(1) already. We can kick this to a follow up PR. made: #307

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sg, lets merge this and follow up with a PR for the logging levels.


infModel := &v1alpha1.InferenceModel{}
if err := c.Get(ctx, req.NamespacedName, infModel); err != nil {
if errors.IsNotFound(err) {
klog.V(1).Infof("InferenceModel %v not found. Removing from datastore since object must be deleted", req.NamespacedName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto to using debugging levels

c.Datastore.InferenceModels.Delete(infModel.Spec.ModelName)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little worried about doing this, but maybe it's unfounded.

My concern is that if for some reason there is a small lapse in communication with the API server, we could remove the inference model from the list of available models until the next reconciliation event (i believe the max time is 10 min, assuming there is no event that triggers reconciliation) So in the worst case, we could knock down a users service for 10 min, and then it pops back up unexpectedly, making for a rather nasty heisenbug. Its unfortunate that controller runtime doesnt let you separate by delete/create/update events, this would be much more easily remedied.

WDYT?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we instead just not return the err? As I believe that causes controller runtime to requeue the reconciliation obj. But that could lead to the issue of leaving up an IM that doesnt actually exist (granted we currently have that issue)

for the long term we could (not this PR):

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to check for deletionTimestamp and remove the InferenceModel if set.

Taking a step back, do we actually need an inferenceModel reconciler? do we need to store the inferenceModel objects ourselves? If not, I think we can drop it and just Get it (controller-runtime has an informer cache underneath).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kfswain we can setup a handlers for delete/create/update with controller-runtime, see for example kueue: https://github.com/kubernetes-sigs/kueue/blob/main/pkg/controller/core/resourceflavor_controller.go#L145

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to check for deletionTimestamp and remove the InferenceModel if set.

Taking a step back, do we actually need an inferenceModel reconciler? do we need to store the inferenceModel objects ourselves? If not, I think we can drop it and just Get it (controller-runtime has an informer cache underneath).

I was thinking about that also, were we to ever set up fairness, we would need to hold on to these objects, or at least their name as the key value for a cache of traffic data. Otherwise we could fetch as needed as long as its using a cache and not blasting the api-server. But is it any better than what we would do here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, we need that for fairness, we should keep it.

I think we need to setup a Delete handler to reliably delete the object from the store just like we do with Kueue.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely agreed there. Made: #310.

I think for this PR we can reduce the logging noise, and then have a separate PR addressing deletion events.

Is that fair?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sg

return ctrl.Result{}, nil
}
klog.Error(err, "Unable to get InferenceModel")
return ctrl.Result{}, err
}

c.updateDatastore(service)
c.updateDatastore(infModel)
return ctrl.Result{}, nil
}

func (c *InferenceModelReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.InferenceModel{}).
Complete(c)
}

func (c *InferenceModelReconciler) updateDatastore(infModel *v1alpha1.InferenceModel) {
if infModel.Spec.PoolRef.Name == c.PoolNamespacedName.Name {
klog.V(1).Infof("Incoming pool ref %v, server pool name: %v", infModel.Spec.PoolRef, c.PoolNamespacedName.Name)
klog.V(1).Infof("Adding/Updating inference model: %v", infModel.Spec.ModelName)
klog.V(1).Infof("Adding/Updating InferenceModel: %v", infModel.Spec.ModelName)
c.Datastore.InferenceModels.Store(infModel.Spec.ModelName, infModel)
return
}
klog.V(logutil.DEFAULT).Infof("Removing/Not adding inference model: %v", infModel.Spec.ModelName)
klog.V(logutil.DEFAULT).Infof("Removing/Not adding InferenceModel: %v", infModel.Spec.ModelName)
// If we get here. The model is not relevant to this pool, remove.
c.Datastore.InferenceModels.Delete(infModel.Spec.ModelName)
}

func (c *InferenceModelReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.InferenceModel{}).
Complete(c)
}
133 changes: 118 additions & 15 deletions pkg/ext-proc/backend/inferencemodel_reconciler_test.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
package backend

import (
"context"
"sync"
"testing"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)

var (
service1 = &v1alpha1.InferenceModel{
infModel1 = &v1alpha1.InferenceModel{
Spec: v1alpha1.InferenceModelSpec{
ModelName: "fake model1",
PoolRef: v1alpha1.PoolObjectReference{Name: "test-pool"},
Expand All @@ -19,7 +25,7 @@ var (
Name: "test-service",
},
}
service1Modified = &v1alpha1.InferenceModel{
infModel1Modified = &v1alpha1.InferenceModel{
Spec: v1alpha1.InferenceModelSpec{
ModelName: "fake model1",
PoolRef: v1alpha1.PoolObjectReference{Name: "test-poolio"},
Expand All @@ -28,7 +34,7 @@ var (
Name: "test-service",
},
}
service2 = &v1alpha1.InferenceModel{
infModel2 = &v1alpha1.InferenceModel{
Spec: v1alpha1.InferenceModelSpec{
ModelName: "fake model",
PoolRef: v1alpha1.PoolObjectReference{Name: "test-pool"},
Expand Down Expand Up @@ -60,8 +66,8 @@ func TestUpdateDatastore_InferenceModelReconciler(t *testing.T) {
},
InferenceModels: &sync.Map{},
},
incomingService: service1,
wantInferenceModels: populateServiceMap(service1),
incomingService: infModel1,
wantInferenceModels: populateServiceMap(infModel1),
},
{
name: "Removing existing service.",
Expand All @@ -75,9 +81,9 @@ func TestUpdateDatastore_InferenceModelReconciler(t *testing.T) {
ResourceVersion: "Old and boring",
},
},
InferenceModels: populateServiceMap(service1),
InferenceModels: populateServiceMap(infModel1),
},
incomingService: service1Modified,
incomingService: infModel1Modified,
wantInferenceModels: populateServiceMap(),
},
{
Expand All @@ -92,7 +98,7 @@ func TestUpdateDatastore_InferenceModelReconciler(t *testing.T) {
ResourceVersion: "Old and boring",
},
},
InferenceModels: populateServiceMap(service1),
InferenceModels: populateServiceMap(infModel1),
},
incomingService: &v1alpha1.InferenceModel{
Spec: v1alpha1.InferenceModelSpec{
Expand All @@ -103,7 +109,7 @@ func TestUpdateDatastore_InferenceModelReconciler(t *testing.T) {
Name: "unrelated-service",
},
},
wantInferenceModels: populateServiceMap(service1),
wantInferenceModels: populateServiceMap(infModel1),
},
{
name: "Add to existing",
Expand All @@ -117,27 +123,124 @@ func TestUpdateDatastore_InferenceModelReconciler(t *testing.T) {
ResourceVersion: "Old and boring",
},
},
InferenceModels: populateServiceMap(service1),
InferenceModels: populateServiceMap(infModel1),
},
incomingService: service2,
wantInferenceModels: populateServiceMap(service1, service2),
incomingService: infModel2,
wantInferenceModels: populateServiceMap(infModel1, infModel2),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
InferenceModelReconciler := &InferenceModelReconciler{
reconciler := &InferenceModelReconciler{
Datastore: test.datastore,
PoolNamespacedName: types.NamespacedName{Name: test.datastore.inferencePool.Name},
}
InferenceModelReconciler.updateDatastore(test.incomingService)
reconciler.updateDatastore(test.incomingService)

if ok := mapsEqual(InferenceModelReconciler.Datastore.InferenceModels, test.wantInferenceModels); !ok {
if ok := mapsEqual(reconciler.Datastore.InferenceModels, test.wantInferenceModels); !ok {
t.Error("Maps are not equal")
}
})
}
}

func TestReconcile_ResourceNotFound(t *testing.T) {
// Set up the scheme.
scheme := runtime.NewScheme()
_ = v1alpha1.AddToScheme(scheme)

// Create a fake client with no InferenceModel objects.
fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build()

// Create a minimal datastore.
datastore := &K8sDatastore{
InferenceModels: &sync.Map{},
inferencePool: &v1alpha1.InferencePool{
ObjectMeta: metav1.ObjectMeta{Name: "test-pool"},
},
}

// Create the reconciler.
reconciler := &InferenceModelReconciler{
Client: fakeClient,
Scheme: scheme,
Record: record.NewFakeRecorder(10),
Datastore: datastore,
PoolNamespacedName: types.NamespacedName{Name: "test-pool"},
}

// Create a request for a non-existent resource.
req := ctrl.Request{NamespacedName: types.NamespacedName{Name: "non-existent-model", Namespace: "default"}}

// Call Reconcile.
result, err := reconciler.Reconcile(context.Background(), req)
if err != nil {
t.Fatalf("expected no error when resource is not found, got %v", err)
}

// Check that no requeue is requested.
if result.Requeue || result.RequeueAfter != 0 {
t.Errorf("expected no requeue, got %+v", result)
}
}

func TestReconcile_ResourceExists(t *testing.T) {
// Set up the scheme.
scheme := runtime.NewScheme()
_ = v1alpha1.AddToScheme(scheme)

// Create an InferenceModel object.
existingModel := &v1alpha1.InferenceModel{
ObjectMeta: metav1.ObjectMeta{
Name: "existing-model",
Namespace: "default",
},
Spec: v1alpha1.InferenceModelSpec{
ModelName: "fake-model",
PoolRef: v1alpha1.PoolObjectReference{Name: "test-pool"},
},
}

// Create a fake client with the existing model.
fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(existingModel).Build()

// Create a minimal datastore.
datastore := &K8sDatastore{
InferenceModels: &sync.Map{},
inferencePool: &v1alpha1.InferencePool{
ObjectMeta: metav1.ObjectMeta{Name: "test-pool"},
},
}

// Create the reconciler.
reconciler := &InferenceModelReconciler{
Client: fakeClient,
Scheme: scheme,
Record: record.NewFakeRecorder(10),
Datastore: datastore,
PoolNamespacedName: types.NamespacedName{Name: "test-pool", Namespace: "default"},
}

// Create a request for the existing resource.
req := ctrl.Request{NamespacedName: types.NamespacedName{Name: "existing-model", Namespace: "default"}}

// Call Reconcile.
result, err := reconciler.Reconcile(context.Background(), req)
if err != nil {
t.Fatalf("expected no error when resource exists, got %v", err)
}

// Check that no requeue is requested.
if result.Requeue || result.RequeueAfter != 0 {
t.Errorf("expected no requeue, got %+v", result)
}

// Verify that the datastore was updated.
if _, ok := datastore.InferenceModels.Load(existingModel.Spec.ModelName); !ok {
t.Errorf("expected datastore to contain model %q", existingModel.Spec.ModelName)
}
}

func populateServiceMap(services ...*v1alpha1.InferenceModel) *sync.Map {
returnVal := &sync.Map{}

Expand Down