Skip to content

Commit c5f664a

Browse files
committed
few updates in datastore
Signed-off-by: Nir Rozenbaum <[email protected]>
1 parent 8b9aef6 commit c5f664a

File tree

7 files changed

+77
-47
lines changed

7 files changed

+77
-47
lines changed

pkg/epp/controller/inferencemodel_reconciler_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2626
"k8s.io/apimachinery/pkg/runtime"
2727
"k8s.io/apimachinery/pkg/types"
28+
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
2829
"k8s.io/client-go/tools/record"
2930
ctrl "sigs.k8s.io/controller-runtime"
3031
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -178,6 +179,7 @@ func TestInferenceModelReconciler(t *testing.T) {
178179
t.Run(test.name, func(t *testing.T) {
179180
// Create a fake client with no InferenceModel objects.
180181
scheme := runtime.NewScheme()
182+
_ = clientgoscheme.AddToScheme(scheme)
181183
_ = v1alpha2.Install(scheme)
182184
initObjs := []client.Object{}
183185
if test.model != nil {
@@ -186,6 +188,7 @@ func TestInferenceModelReconciler(t *testing.T) {
186188
for _, m := range test.modelsInAPIServer {
187189
initObjs = append(initObjs, m)
188190
}
191+
189192
fakeClient := fake.NewClientBuilder().
190193
WithScheme(scheme).
191194
WithObjects(initObjs...).
@@ -196,7 +199,7 @@ func TestInferenceModelReconciler(t *testing.T) {
196199
for _, m := range test.modelsInStore {
197200
ds.ModelSetIfOlder(m)
198201
}
199-
ds.PoolSet(pool)
202+
ds.PoolSet(context.Background(), fakeClient, pool)
200203
reconciler := &InferenceModelReconciler{
201204
Client: fakeClient,
202205
Record: record.NewFakeRecorder(10),

pkg/epp/controller/inferencepool_reconciler.go

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package controller
1818

1919
import (
2020
"context"
21-
"reflect"
2221

2322
"k8s.io/apimachinery/pkg/api/errors"
2423
"k8s.io/client-go/tools/record"
@@ -60,28 +59,15 @@ func (c *InferencePoolReconciler) Reconcile(ctx context.Context, req ctrl.Reques
6059
c.Datastore.Clear()
6160
return ctrl.Result{}, nil
6261
}
63-
64-
c.updateDatastore(ctx, infPool)
62+
// update pool in datastore
63+
if err := c.Datastore.PoolSet(ctx, c.Client, infPool); err != nil {
64+
logger.Error(err, "Failed to update datastore")
65+
return ctrl.Result{}, err
66+
}
6567

6668
return ctrl.Result{}, nil
6769
}
6870

69-
func (c *InferencePoolReconciler) updateDatastore(ctx context.Context, newPool *v1alpha2.InferencePool) {
70-
logger := log.FromContext(ctx)
71-
oldPool, err := c.Datastore.PoolGet()
72-
c.Datastore.PoolSet(newPool)
73-
if err != nil || !reflect.DeepEqual(newPool.Spec.Selector, oldPool.Spec.Selector) {
74-
logger.V(logutil.DEFAULT).Info("Updating inference pool endpoints", "selector", newPool.Spec.Selector)
75-
// A full resync is required to address two cases:
76-
// 1) At startup, the pod events may get processed before the pool is synced with the datastore,
77-
// and hence they will not be added to the store since pool selector is not known yet
78-
// 2) If the selector on the pool was updated, then we will not get any pod events, and so we need
79-
// to resync the whole pool: remove pods in the store that don't match the new selector and add
80-
// the ones that may have existed already to the store.
81-
c.Datastore.PodResyncAll(ctx, c.Client, newPool)
82-
}
83-
}
84-
8571
func (c *InferencePoolReconciler) SetupWithManager(mgr ctrl.Manager) error {
8672
return ctrl.NewControllerManagedBy(mgr).
8773
For(&v1alpha2.InferencePool{}).

pkg/epp/controller/pod_reconciler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func (c *PodReconciler) SetupWithManager(mgr ctrl.Manager) error {
7272

7373
func (c *PodReconciler) updateDatastore(logger logr.Logger, pod *corev1.Pod, pool *v1alpha2.InferencePool) {
7474
namespacedName := types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}
75-
if !pod.DeletionTimestamp.IsZero() || !c.Datastore.PoolLabelsMatch(pod.Labels) || !podutil.IsPodReady(pod) {
75+
if !podutil.IsPodReady(pod) || !c.Datastore.PoolLabelsMatch(pod.Labels) {
7676
logger.V(logutil.DEBUG).Info("Pod removed or not added", "name", namespacedName)
7777
c.Datastore.PodDelete(namespacedName)
7878
} else {

pkg/epp/controller/pod_reconciler_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ func TestPodReconciler(t *testing.T) {
182182

183183
// Configure the initial state of the datastore.
184184
store := datastore.NewDatastore(t.Context(), pmf)
185-
store.PoolSet(test.pool)
185+
store.PoolSet(t.Context(), fakeClient, test.pool)
186186
for _, pod := range test.existingPods {
187187
store.PodUpdateOrAddIfNotExist(pod, pool)
188188
}

pkg/epp/datastore/datastore.go

Lines changed: 46 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"errors"
2222
"fmt"
23+
"reflect"
2324
"sync"
2425

2526
corev1 "k8s.io/api/core/v1"
@@ -44,7 +45,7 @@ var (
4445
// The datastore is a local cache of relevant data for the given InferencePool (currently all pulled from k8s-api)
4546
type Datastore interface {
4647
// InferencePool operations
47-
PoolSet(pool *v1alpha2.InferencePool)
48+
PoolSet(ctx context.Context, client client.Client, pool *v1alpha2.InferencePool) error
4849
PoolGet() (*v1alpha2.InferencePool, error)
4950
PoolHasSynced() bool
5051
PoolLabelsMatch(podLabels map[string]string) bool
@@ -63,7 +64,6 @@ type Datastore interface {
6364
PodList(func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics
6465
PodUpdateOrAddIfNotExist(pod *corev1.Pod, pool *v1alpha2.InferencePool) bool
6566
PodDelete(namespacedName types.NamespacedName)
66-
PodResyncAll(ctx context.Context, ctrlClient client.Client, pool *v1alpha2.InferencePool)
6767

6868
// Clears the store state, happens when the pool gets deleted.
6969
Clear()
@@ -102,10 +102,31 @@ func (ds *datastore) Clear() {
102102
}
103103

104104
// /// InferencePool APIs ///
105-
func (ds *datastore) PoolSet(pool *v1alpha2.InferencePool) {
105+
func (ds *datastore) PoolSet(ctx context.Context, client client.Client, pool *v1alpha2.InferencePool) error {
106+
if pool == nil {
107+
ds.Clear()
108+
return nil
109+
}
110+
logger := log.FromContext(ctx)
106111
ds.poolAndModelsMu.Lock()
107112
defer ds.poolAndModelsMu.Unlock()
113+
114+
oldPool := ds.pool
108115
ds.pool = pool
116+
if oldPool == nil || !reflect.DeepEqual(pool.Spec.Selector, oldPool.Spec.Selector) {
117+
logger.V(logutil.DEFAULT).Info("Updating inference pool endpoints", "selector", pool.Spec.Selector)
118+
// A full resync is required to address two cases:
119+
// 1) At startup, the pod events may get processed before the pool is synced with the datastore,
120+
// and hence they will not be added to the store since pool selector is not known yet
121+
// 2) If the selector on the pool was updated, then we will not get any pod events, and so we need
122+
// to resync the whole pool: remove pods in the store that don't match the new selector and add
123+
// the ones that may have existed already to the store.
124+
if err := ds.podResyncAll(ctx, client); err != nil {
125+
return fmt.Errorf("failed to update pods to match the updated pool selector")
126+
}
127+
}
128+
129+
return nil
109130
}
110131

111132
func (ds *datastore) PoolGet() (*v1alpha2.InferencePool, error) {
@@ -247,27 +268,35 @@ func (ds *datastore) PodUpdateOrAddIfNotExist(pod *corev1.Pod, pool *v1alpha2.In
247268
return ok
248269
}
249270

250-
func (ds *datastore) PodResyncAll(ctx context.Context, ctrlClient client.Client, pool *v1alpha2.InferencePool) {
271+
func (ds *datastore) PodDelete(namespacedName types.NamespacedName) {
272+
v, ok := ds.pods.LoadAndDelete(namespacedName)
273+
if ok {
274+
pmr := v.(backendmetrics.PodMetrics)
275+
pmr.StopRefreshLoop()
276+
}
277+
}
278+
279+
func (ds *datastore) podResyncAll(ctx context.Context, ctrlClient client.Client) error {
251280
logger := log.FromContext(ctx)
252281
podList := &corev1.PodList{}
253282
if err := ctrlClient.List(ctx, podList, &client.ListOptions{
254-
LabelSelector: selectorFromInferencePoolSelector(pool.Spec.Selector),
255-
Namespace: pool.Namespace,
283+
LabelSelector: selectorFromInferencePoolSelector(ds.pool.Spec.Selector),
284+
Namespace: ds.pool.Namespace,
256285
}); err != nil {
257-
log.FromContext(ctx).V(logutil.DEFAULT).Error(err, "Failed to list clients")
258-
return
286+
return fmt.Errorf("failed to list pods - %w", err)
259287
}
260288

261289
activePods := make(map[string]bool)
262290
for _, pod := range podList.Items {
263-
if podutil.IsPodReady(&pod) {
264-
namespacedName := types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}
265-
activePods[pod.Name] = true
266-
if ds.PodUpdateOrAddIfNotExist(&pod, pool) {
267-
logger.V(logutil.DEFAULT).Info("Pod added", "name", namespacedName)
268-
} else {
269-
logger.V(logutil.DEFAULT).Info("Pod already exists", "name", namespacedName)
270-
}
291+
if !podutil.IsPodReady(&pod) {
292+
continue
293+
}
294+
namespacedName := types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}
295+
activePods[pod.Name] = true
296+
if ds.PodUpdateOrAddIfNotExist(&pod, ds.pool) {
297+
logger.V(logutil.DEFAULT).Info("Pod added", "name", namespacedName)
298+
} else {
299+
logger.V(logutil.DEFAULT).Info("Pod already exists", "name", namespacedName)
271300
}
272301
}
273302

@@ -281,14 +310,8 @@ func (ds *datastore) PodResyncAll(ctx context.Context, ctrlClient client.Client,
281310
return true
282311
}
283312
ds.pods.Range(deleteFn)
284-
}
285313

286-
func (ds *datastore) PodDelete(namespacedName types.NamespacedName) {
287-
v, ok := ds.pods.LoadAndDelete(namespacedName)
288-
if ok {
289-
pmr := v.(backendmetrics.PodMetrics)
290-
pmr.StopRefreshLoop()
291-
}
314+
return nil
292315
}
293316

294317
func selectorFromInferencePoolSelector(selector map[v1alpha2.LabelKey]v1alpha2.LabelValue) labels.Selector {

pkg/epp/datastore/datastore_test.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@ import (
2727
"github.com/stretchr/testify/assert"
2828
corev1 "k8s.io/api/core/v1"
2929
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30+
"k8s.io/apimachinery/pkg/runtime"
3031
"k8s.io/apimachinery/pkg/types"
32+
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
33+
"sigs.k8s.io/controller-runtime/pkg/client/fake"
3134
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2"
3235
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
3336
testutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/testing"
@@ -71,9 +74,15 @@ func TestPool(t *testing.T) {
7174
}
7275
for _, tt := range tests {
7376
t.Run(tt.name, func(t *testing.T) {
77+
// Set up the scheme.
78+
scheme := runtime.NewScheme()
79+
_ = clientgoscheme.AddToScheme(scheme)
80+
fakeClient := fake.NewClientBuilder().
81+
WithScheme(scheme).
82+
Build()
7483
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
7584
datastore := NewDatastore(context.Background(), pmf)
76-
datastore.PoolSet(tt.inferencePool)
85+
datastore.PoolSet(context.Background(), fakeClient, tt.inferencePool)
7786
gotPool, gotErr := datastore.PoolGet()
7887
if diff := cmp.Diff(tt.wantErr, gotErr, cmpopts.EquateErrors()); diff != "" {
7988
t.Errorf("Unexpected error diff (+got/-want): %s", diff)
@@ -320,9 +329,15 @@ func TestMetrics(t *testing.T) {
320329
t.Run(test.name, func(t *testing.T) {
321330
ctx, cancel := context.WithCancel(context.Background())
322331
defer cancel()
332+
// Set up the scheme.
333+
scheme := runtime.NewScheme()
334+
_ = clientgoscheme.AddToScheme(scheme)
335+
fakeClient := fake.NewClientBuilder().
336+
WithScheme(scheme).
337+
Build()
323338
pmf := backendmetrics.NewPodMetricsFactory(test.pmc, time.Millisecond)
324339
ds := NewDatastore(ctx, pmf)
325-
ds.PoolSet(inferencePool)
340+
ds.PoolSet(ctx, fakeClient, inferencePool)
326341
for _, pod := range test.storePods {
327342
ds.PodUpdateOrAddIfNotExist(pod, inferencePool)
328343
}

pkg/epp/util/pod/pod.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ import (
2121
)
2222

2323
func IsPodReady(pod *corev1.Pod) bool {
24+
if !pod.DeletionTimestamp.IsZero() {
25+
return false
26+
}
2427
for _, condition := range pod.Status.Conditions {
2528
if condition.Type == corev1.PodReady {
2629
if condition.Status == corev1.ConditionTrue {

0 commit comments

Comments
 (0)