@@ -18,8 +18,8 @@ package controller
18
18
19
19
import (
20
20
"context"
21
+ "fmt"
21
22
22
- "github.com/go-logr/logr"
23
23
"k8s.io/apimachinery/pkg/api/errors"
24
24
"k8s.io/apimachinery/pkg/runtime"
25
25
"k8s.io/apimachinery/pkg/types"
@@ -32,6 +32,10 @@ import (
32
32
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
33
33
)
34
34
35
+ const (
36
+ modelNameKey = "spec.modelName"
37
+ )
38
+
35
39
type InferenceModelReconciler struct {
36
40
client.Client
37
41
Scheme * runtime.Scheme
@@ -44,45 +48,100 @@ func (c *InferenceModelReconciler) Reconcile(ctx context.Context, req ctrl.Reque
44
48
if req .Namespace != c .PoolNamespacedName .Namespace {
45
49
return ctrl.Result {}, nil
46
50
}
51
+ logger := log .FromContext (ctx ).V (logutil .DEFAULT ).WithValues ("inferenceModel" , req .Name )
52
+ ctx = ctrl .LoggerInto (ctx , logger )
47
53
48
- logger := log .FromContext (ctx )
49
- loggerDefault := logger .V (logutil .DEFAULT )
50
- loggerDefault .Info ("Reconciling InferenceModel" , "name" , req .NamespacedName )
54
+ logger .Info ("Reconciling InferenceModel" )
51
55
52
56
infModel := & v1alpha1.InferenceModel {}
57
+ notFound := false
53
58
if err := c .Get (ctx , req .NamespacedName , infModel ); err != nil {
54
- if errors .IsNotFound (err ) {
55
- loggerDefault .Info ("InferenceModel not found. Removing from datastore since object must be deleted" , "name" , req .NamespacedName )
56
- c .Datastore .ModelDelete (infModel .Spec .ModelName )
57
- return ctrl.Result {}, nil
59
+ if ! errors .IsNotFound (err ) {
60
+ logger .Error (err , "Unable to get InferenceModel" )
61
+ return ctrl.Result {}, err
58
62
}
59
- loggerDefault .Error (err , "Unable to get InferenceModel" , "name" , req .NamespacedName )
63
+ notFound = true
64
+ }
65
+
66
+ if notFound || ! infModel .DeletionTimestamp .IsZero () || infModel .Spec .PoolRef .Name != c .PoolNamespacedName .Name {
67
+ // InferenceModel object got deleted or changed the referenced pool.
68
+ err := c .handleModelDeleted (ctx , req .NamespacedName )
60
69
return ctrl.Result {}, err
61
- } else if ! infModel .DeletionTimestamp .IsZero () {
62
- loggerDefault .Info ("InferenceModel is marked for deletion. Removing from datastore" , "name" , req .NamespacedName )
63
- c .Datastore .ModelDelete (infModel .Spec .ModelName )
64
- return ctrl.Result {}, nil
65
70
}
66
71
67
- c .updateDatastore (logger , infModel )
72
+ // Add or update if the InferenceModel instance has a creation timestamp older than the existing entry of the model.
73
+ logger = logger .WithValues ("poolRef" , infModel .Spec .PoolRef ).WithValues ("modelName" , infModel .Spec .ModelName )
74
+ if ! c .Datastore .ModelSetIfOlder (infModel ) {
75
+ logger .Info ("Skipping InferenceModel, existing instance has older creation timestamp" )
76
+
77
+ }
78
+ logger .Info ("Added/Updated InferenceModel" )
79
+
68
80
return ctrl.Result {}, nil
69
81
}
70
82
71
- func (c * InferenceModelReconciler ) updateDatastore (logger logr.Logger , infModel * v1alpha1.InferenceModel ) {
72
- loggerDefault := logger .V (logutil .DEFAULT )
83
+ func (c * InferenceModelReconciler ) handleModelDeleted (ctx context.Context , req types.NamespacedName ) error {
84
+ logger := log .FromContext (ctx )
85
+
86
+ // We will lookup the modelName assocaited with this object to search for
87
+ // other instance referencing the same ModelName if exist to store the oldest in
88
+ // its place. This ensures that the InferenceModel with the oldest creation
89
+ // timestamp is active.
90
+ existing , exists := c .Datastore .ModelGetByObjName (req )
91
+ if ! exists {
92
+ // No entry exists in the first place, nothing to do.
93
+ return nil
94
+ }
95
+ // Delete the internal object, it may be replaced with another version below.
96
+ c .Datastore .ModelDelete (req )
97
+ logger .Info ("InferenceModel removed from datastore" , "poolRef" , existing .Spec .PoolRef , "modelName" , existing .Spec .ModelName )
73
98
74
- if infModel .Spec .PoolRef .Name == c .PoolNamespacedName .Name {
75
- loggerDefault .Info ("Updating datastore" , "poolRef" , infModel .Spec .PoolRef , "serverPoolName" , c .PoolNamespacedName )
76
- loggerDefault .Info ("Adding/Updating InferenceModel" , "modelName" , infModel .Spec .ModelName )
77
- c .Datastore .ModelSet (infModel )
78
- return
99
+ // List all InferenceModels with a matching ModelName.
100
+ var models v1alpha1.InferenceModelList
101
+ if err := c .List (ctx , & models , client.MatchingFields {modelNameKey : existing .Spec .ModelName }, client .InNamespace (c .PoolNamespacedName .Namespace )); err != nil {
102
+ return fmt .Errorf ("listing models that match the modelName %s: %w" , existing .Spec .ModelName , err )
79
103
}
80
- loggerDefault .Info ("Removing/Not adding InferenceModel" , "modelName" , infModel .Spec .ModelName )
81
- // If we get here. The model is not relevant to this pool, remove.
82
- c .Datastore .ModelDelete (infModel .Spec .ModelName )
104
+ if len (models .Items ) == 0 {
105
+ // No other instances of InferenceModels with this ModelName exists.
106
+ return nil
107
+ }
108
+
109
+ var oldest * v1alpha1.InferenceModel
110
+ for i := range models .Items {
111
+ m := & models .Items [i ]
112
+ if existing .Spec .ModelName != existing .Spec .ModelName || // The index should filter those out, but just in case!
113
+ m .Spec .PoolRef .Name != c .PoolNamespacedName .Name || // We don't care about other pools, we could setup an index on this too!
114
+ m .Name == existing .Name { // We don't care about the same object, it could be in the list if it was only marked for deletion, but not yet deleted.
115
+ continue
116
+ }
117
+ if oldest == nil || m .ObjectMeta .CreationTimestamp .Before (& oldest .ObjectMeta .CreationTimestamp ) {
118
+ oldest = m
119
+ }
120
+ }
121
+ if oldest != nil && c .Datastore .ModelSetIfOlder (oldest ) {
122
+ logger .Info ("InferenceModel replaced." ,
123
+ "poolRef" , oldest .Spec .PoolRef ,
124
+ "modelName" , oldest .Spec .ModelName ,
125
+ "newInferenceModel" , types.NamespacedName {Name : oldest .Name , Namespace : oldest .Namespace })
126
+ }
127
+
128
+ return nil
83
129
}
84
130
85
- func (c * InferenceModelReconciler ) SetupWithManager (mgr ctrl.Manager ) error {
131
+ func indexInferenceModelsByModelName (obj client.Object ) []string {
132
+ m , ok := obj .(* v1alpha1.InferenceModel )
133
+ if ! ok {
134
+ return nil
135
+ }
136
+ return []string {string (m .Spec .ModelName )}
137
+ }
138
+
139
+ func (c * InferenceModelReconciler ) SetupWithManager (ctx context.Context , mgr ctrl.Manager ) error {
140
+ // Create an index on ModelName for InferenceModel objects.
141
+ indexer := mgr .GetFieldIndexer ()
142
+ if err := indexer .IndexField (ctx , & v1alpha1.InferenceModel {}, modelNameKey , indexInferenceModelsByModelName ); err != nil {
143
+ return fmt .Errorf ("setting index on ModelName for InferenceModel: %w" , err )
144
+ }
86
145
return ctrl .NewControllerManagedBy (mgr ).
87
146
For (& v1alpha1.InferenceModel {}).
88
147
Complete (c )
0 commit comments