Skip to content

Commit 7792676

Browse files
nirrozenbaumrlakhtakia
authored andcommitted
few updates in datastore (kubernetes-sigs#713)
* few updates in datastore Signed-off-by: Nir Rozenbaum <[email protected]> * PoolSet documentation Signed-off-by: Nir Rozenbaum <[email protected]> * error phrasing Signed-off-by: Nir Rozenbaum <[email protected]> * removed unused pool arg from PodUpdateOrAddIfNotExist Signed-off-by: Nir Rozenbaum <[email protected]> * linter Signed-off-by: Nir Rozenbaum <[email protected]> --------- Signed-off-by: Nir Rozenbaum <[email protected]>
1 parent 76a562f commit 7792676

File tree

7 files changed

+88
-58
lines changed

7 files changed

+88
-58
lines changed

pkg/epp/controller/inferencemodel_reconciler_test.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2626
"k8s.io/apimachinery/pkg/runtime"
2727
"k8s.io/apimachinery/pkg/types"
28+
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
2829
"k8s.io/client-go/tools/record"
2930
ctrl "sigs.k8s.io/controller-runtime"
3031
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -178,6 +179,7 @@ func TestInferenceModelReconciler(t *testing.T) {
178179
t.Run(test.name, func(t *testing.T) {
179180
// Create a fake client with no InferenceModel objects.
180181
scheme := runtime.NewScheme()
182+
_ = clientgoscheme.AddToScheme(scheme)
181183
_ = v1alpha2.Install(scheme)
182184
initObjs := []client.Object{}
183185
if test.model != nil {
@@ -186,6 +188,7 @@ func TestInferenceModelReconciler(t *testing.T) {
186188
for _, m := range test.modelsInAPIServer {
187189
initObjs = append(initObjs, m)
188190
}
191+
189192
fakeClient := fake.NewClientBuilder().
190193
WithScheme(scheme).
191194
WithObjects(initObjs...).
@@ -196,7 +199,7 @@ func TestInferenceModelReconciler(t *testing.T) {
196199
for _, m := range test.modelsInStore {
197200
ds.ModelSetIfOlder(m)
198201
}
199-
ds.PoolSet(pool)
202+
_ = ds.PoolSet(context.Background(), fakeClient, pool)
200203
reconciler := &InferenceModelReconciler{
201204
Client: fakeClient,
202205
Record: record.NewFakeRecorder(10),

pkg/epp/controller/inferencepool_reconciler.go

+5-19
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package controller
1818

1919
import (
2020
"context"
21-
"reflect"
2221

2322
"k8s.io/apimachinery/pkg/api/errors"
2423
"k8s.io/client-go/tools/record"
@@ -60,28 +59,15 @@ func (c *InferencePoolReconciler) Reconcile(ctx context.Context, req ctrl.Reques
6059
c.Datastore.Clear()
6160
return ctrl.Result{}, nil
6261
}
63-
64-
c.updateDatastore(ctx, infPool)
62+
// update pool in datastore
63+
if err := c.Datastore.PoolSet(ctx, c.Client, infPool); err != nil {
64+
logger.Error(err, "Failed to update datastore")
65+
return ctrl.Result{}, err
66+
}
6567

6668
return ctrl.Result{}, nil
6769
}
6870

69-
func (c *InferencePoolReconciler) updateDatastore(ctx context.Context, newPool *v1alpha2.InferencePool) {
70-
logger := log.FromContext(ctx)
71-
oldPool, err := c.Datastore.PoolGet()
72-
c.Datastore.PoolSet(newPool)
73-
if err != nil || !reflect.DeepEqual(newPool.Spec.Selector, oldPool.Spec.Selector) {
74-
logger.V(logutil.DEFAULT).Info("Updating inference pool endpoints", "selector", newPool.Spec.Selector)
75-
// A full resync is required to address two cases:
76-
// 1) At startup, the pod events may get processed before the pool is synced with the datastore,
77-
// and hence they will not be added to the store since pool selector is not known yet
78-
// 2) If the selector on the pool was updated, then we will not get any pod events, and so we need
79-
// to resync the whole pool: remove pods in the store that don't match the new selector and add
80-
// the ones that may have existed already to the store.
81-
c.Datastore.PodResyncAll(ctx, c.Client, newPool)
82-
}
83-
}
84-
8571
func (c *InferencePoolReconciler) SetupWithManager(mgr ctrl.Manager) error {
8672
return ctrl.NewControllerManagedBy(mgr).
8773
For(&v1alpha2.InferencePool{}).

pkg/epp/controller/pod_reconciler.go

+5-7
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import (
2727
ctrl "sigs.k8s.io/controller-runtime"
2828
"sigs.k8s.io/controller-runtime/pkg/client"
2929
"sigs.k8s.io/controller-runtime/pkg/log"
30-
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2"
3130
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
3231
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
3332
podutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/pod"
@@ -41,8 +40,7 @@ type PodReconciler struct {
4140

4241
func (c *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
4342
logger := log.FromContext(ctx)
44-
pool, err := c.Datastore.PoolGet()
45-
if err != nil {
43+
if !c.Datastore.PoolHasSynced() {
4644
logger.V(logutil.TRACE).Info("Skipping reconciling Pod because the InferencePool is not available yet")
4745
// When the inferencePool is initialized it lists the appropriate pods and populates the datastore, so no need to requeue.
4846
return ctrl.Result{}, nil
@@ -60,7 +58,7 @@ func (c *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
6058
return ctrl.Result{}, err
6159
}
6260

63-
c.updateDatastore(logger, pod, pool)
61+
c.updateDatastore(logger, pod)
6462
return ctrl.Result{}, nil
6563
}
6664

@@ -70,13 +68,13 @@ func (c *PodReconciler) SetupWithManager(mgr ctrl.Manager) error {
7068
Complete(c)
7169
}
7270

73-
func (c *PodReconciler) updateDatastore(logger logr.Logger, pod *corev1.Pod, pool *v1alpha2.InferencePool) {
71+
func (c *PodReconciler) updateDatastore(logger logr.Logger, pod *corev1.Pod) {
7472
namespacedName := types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}
75-
if !pod.DeletionTimestamp.IsZero() || !c.Datastore.PoolLabelsMatch(pod.Labels) || !podutil.IsPodReady(pod) {
73+
if !podutil.IsPodReady(pod) || !c.Datastore.PoolLabelsMatch(pod.Labels) {
7674
logger.V(logutil.DEBUG).Info("Pod removed or not added", "name", namespacedName)
7775
c.Datastore.PodDelete(namespacedName)
7876
} else {
79-
if c.Datastore.PodUpdateOrAddIfNotExist(pod, pool) {
77+
if c.Datastore.PodUpdateOrAddIfNotExist(pod) {
8078
logger.V(logutil.DEFAULT).Info("Pod added", "name", namespacedName)
8179
} else {
8280
logger.V(logutil.DEFAULT).Info("Pod already exists", "name", namespacedName)

pkg/epp/controller/pod_reconciler_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -182,9 +182,9 @@ func TestPodReconciler(t *testing.T) {
182182

183183
// Configure the initial state of the datastore.
184184
store := datastore.NewDatastore(t.Context(), pmf)
185-
store.PoolSet(test.pool)
185+
_ = store.PoolSet(t.Context(), fakeClient, test.pool)
186186
for _, pod := range test.existingPods {
187-
store.PodUpdateOrAddIfNotExist(pod, pool)
187+
store.PodUpdateOrAddIfNotExist(pod)
188188
}
189189

190190
podReconciler := &PodReconciler{Client: fakeClient, Datastore: store}

pkg/epp/datastore/datastore.go

+52-26
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"errors"
2222
"fmt"
23+
"reflect"
2324
"sync"
2425

2526
corev1 "k8s.io/api/core/v1"
@@ -44,7 +45,10 @@ var (
4445
// The datastore is a local cache of relevant data for the given InferencePool (currently all pulled from k8s-api)
4546
type Datastore interface {
4647
// InferencePool operations
47-
PoolSet(pool *v1alpha2.InferencePool)
48+
// PoolSet sets the given pool in datastore. If the given pool has different label selector than the previous pool
49+
// that was stored, the function triggers a resync of the pods to keep the datastore updated. If the given pool
50+
// is nil, this call triggers the datastore.Clear() function.
51+
PoolSet(ctx context.Context, client client.Client, pool *v1alpha2.InferencePool) error
4852
PoolGet() (*v1alpha2.InferencePool, error)
4953
PoolHasSynced() bool
5054
PoolLabelsMatch(podLabels map[string]string) bool
@@ -60,10 +64,9 @@ type Datastore interface {
6064
// PodGetAll returns all pods and metrics, including fresh and stale.
6165
PodGetAll() []backendmetrics.PodMetrics
6266
// PodList lists pods matching the given predicate.
63-
PodList(func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics
64-
PodUpdateOrAddIfNotExist(pod *corev1.Pod, pool *v1alpha2.InferencePool) bool
67+
PodList(predicate func(backendmetrics.PodMetrics) bool) []backendmetrics.PodMetrics
68+
PodUpdateOrAddIfNotExist(pod *corev1.Pod) bool
6569
PodDelete(namespacedName types.NamespacedName)
66-
PodResyncAll(ctx context.Context, ctrlClient client.Client, pool *v1alpha2.InferencePool)
6770

6871
// Clears the store state, happens when the pool gets deleted.
6972
Clear()
@@ -102,10 +105,31 @@ func (ds *datastore) Clear() {
102105
}
103106

104107
// /// InferencePool APIs ///
105-
func (ds *datastore) PoolSet(pool *v1alpha2.InferencePool) {
108+
func (ds *datastore) PoolSet(ctx context.Context, client client.Client, pool *v1alpha2.InferencePool) error {
109+
if pool == nil {
110+
ds.Clear()
111+
return nil
112+
}
113+
logger := log.FromContext(ctx)
106114
ds.poolAndModelsMu.Lock()
107115
defer ds.poolAndModelsMu.Unlock()
116+
117+
oldPool := ds.pool
108118
ds.pool = pool
119+
if oldPool == nil || !reflect.DeepEqual(pool.Spec.Selector, oldPool.Spec.Selector) {
120+
logger.V(logutil.DEFAULT).Info("Updating inference pool endpoints", "selector", pool.Spec.Selector)
121+
// A full resync is required to address two cases:
122+
// 1) At startup, the pod events may get processed before the pool is synced with the datastore,
123+
// and hence they will not be added to the store since pool selector is not known yet
124+
// 2) If the selector on the pool was updated, then we will not get any pod events, and so we need
125+
// to resync the whole pool: remove pods in the store that don't match the new selector and add
126+
// the ones that may have existed already to the store.
127+
if err := ds.podResyncAll(ctx, client); err != nil {
128+
return fmt.Errorf("failed to update pods according to the pool selector - %w", err)
129+
}
130+
}
131+
132+
return nil
109133
}
110134

111135
func (ds *datastore) PoolGet() (*v1alpha2.InferencePool, error) {
@@ -229,7 +253,7 @@ func (ds *datastore) PodList(predicate func(backendmetrics.PodMetrics) bool) []b
229253
return res
230254
}
231255

232-
func (ds *datastore) PodUpdateOrAddIfNotExist(pod *corev1.Pod, pool *v1alpha2.InferencePool) bool {
256+
func (ds *datastore) PodUpdateOrAddIfNotExist(pod *corev1.Pod) bool {
233257
namespacedName := types.NamespacedName{
234258
Name: pod.Name,
235259
Namespace: pod.Namespace,
@@ -247,27 +271,35 @@ func (ds *datastore) PodUpdateOrAddIfNotExist(pod *corev1.Pod, pool *v1alpha2.In
247271
return ok
248272
}
249273

250-
func (ds *datastore) PodResyncAll(ctx context.Context, ctrlClient client.Client, pool *v1alpha2.InferencePool) {
274+
func (ds *datastore) PodDelete(namespacedName types.NamespacedName) {
275+
v, ok := ds.pods.LoadAndDelete(namespacedName)
276+
if ok {
277+
pmr := v.(backendmetrics.PodMetrics)
278+
pmr.StopRefreshLoop()
279+
}
280+
}
281+
282+
func (ds *datastore) podResyncAll(ctx context.Context, ctrlClient client.Client) error {
251283
logger := log.FromContext(ctx)
252284
podList := &corev1.PodList{}
253285
if err := ctrlClient.List(ctx, podList, &client.ListOptions{
254-
LabelSelector: selectorFromInferencePoolSelector(pool.Spec.Selector),
255-
Namespace: pool.Namespace,
286+
LabelSelector: selectorFromInferencePoolSelector(ds.pool.Spec.Selector),
287+
Namespace: ds.pool.Namespace,
256288
}); err != nil {
257-
log.FromContext(ctx).V(logutil.DEFAULT).Error(err, "Failed to list clients")
258-
return
289+
return fmt.Errorf("failed to list pods - %w", err)
259290
}
260291

261292
activePods := make(map[string]bool)
262293
for _, pod := range podList.Items {
263-
if podutil.IsPodReady(&pod) {
264-
namespacedName := types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}
265-
activePods[pod.Name] = true
266-
if ds.PodUpdateOrAddIfNotExist(&pod, pool) {
267-
logger.V(logutil.DEFAULT).Info("Pod added", "name", namespacedName)
268-
} else {
269-
logger.V(logutil.DEFAULT).Info("Pod already exists", "name", namespacedName)
270-
}
294+
if !podutil.IsPodReady(&pod) {
295+
continue
296+
}
297+
namespacedName := types.NamespacedName{Name: pod.Name, Namespace: pod.Namespace}
298+
activePods[pod.Name] = true
299+
if ds.PodUpdateOrAddIfNotExist(&pod) {
300+
logger.V(logutil.DEFAULT).Info("Pod added", "name", namespacedName)
301+
} else {
302+
logger.V(logutil.DEFAULT).Info("Pod already exists", "name", namespacedName)
271303
}
272304
}
273305

@@ -281,14 +313,8 @@ func (ds *datastore) PodResyncAll(ctx context.Context, ctrlClient client.Client,
281313
return true
282314
}
283315
ds.pods.Range(deleteFn)
284-
}
285316

286-
func (ds *datastore) PodDelete(namespacedName types.NamespacedName) {
287-
v, ok := ds.pods.LoadAndDelete(namespacedName)
288-
if ok {
289-
pmr := v.(backendmetrics.PodMetrics)
290-
pmr.StopRefreshLoop()
291-
}
317+
return nil
292318
}
293319

294320
func selectorFromInferencePoolSelector(selector map[v1alpha2.LabelKey]v1alpha2.LabelValue) labels.Selector {

pkg/epp/datastore/datastore_test.go

+17-3
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,10 @@ import (
2929
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3030
"k8s.io/apimachinery/pkg/runtime"
3131
"k8s.io/apimachinery/pkg/types"
32+
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
3233
"sigs.k8s.io/controller-runtime/pkg/client"
3334
"sigs.k8s.io/controller-runtime/pkg/client/config"
35+
"sigs.k8s.io/controller-runtime/pkg/client/fake"
3436
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2"
3537
backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
3638
testutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/testing"
@@ -74,9 +76,15 @@ func TestPool(t *testing.T) {
7476
}
7577
for _, tt := range tests {
7678
t.Run(tt.name, func(t *testing.T) {
79+
// Set up the scheme.
80+
scheme := runtime.NewScheme()
81+
_ = clientgoscheme.AddToScheme(scheme)
82+
fakeClient := fake.NewClientBuilder().
83+
WithScheme(scheme).
84+
Build()
7785
pmf := backendmetrics.NewPodMetricsFactory(&backendmetrics.FakePodMetricsClient{}, time.Second)
7886
datastore := NewDatastore(context.Background(), pmf)
79-
datastore.PoolSet(tt.inferencePool)
87+
_ = datastore.PoolSet(context.Background(), fakeClient, tt.inferencePool)
8088
gotPool, gotErr := datastore.PoolGet()
8189
if diff := cmp.Diff(tt.wantErr, gotErr, cmpopts.EquateErrors()); diff != "" {
8290
t.Errorf("Unexpected error diff (+got/-want): %s", diff)
@@ -323,11 +331,17 @@ func TestMetrics(t *testing.T) {
323331
t.Run(test.name, func(t *testing.T) {
324332
ctx, cancel := context.WithCancel(context.Background())
325333
defer cancel()
334+
// Set up the scheme.
335+
scheme := runtime.NewScheme()
336+
_ = clientgoscheme.AddToScheme(scheme)
337+
fakeClient := fake.NewClientBuilder().
338+
WithScheme(scheme).
339+
Build()
326340
pmf := backendmetrics.NewPodMetricsFactory(test.pmc, time.Millisecond)
327341
ds := NewDatastore(ctx, pmf)
328-
ds.PoolSet(inferencePool)
342+
_ = ds.PoolSet(ctx, fakeClient, inferencePool)
329343
for _, pod := range test.storePods {
330-
ds.PodUpdateOrAddIfNotExist(pod, inferencePool)
344+
ds.PodUpdateOrAddIfNotExist(pod)
331345
}
332346
assert.EventuallyWithT(t, func(t *assert.CollectT) {
333347
got := ds.PodGetAll()

pkg/epp/util/pod/pod.go

+3
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ import (
2121
)
2222

2323
func IsPodReady(pod *corev1.Pod) bool {
24+
if !pod.DeletionTimestamp.IsZero() {
25+
return false
26+
}
2427
for _, condition := range pod.Status.Conditions {
2528
if condition.Type == corev1.PodReady {
2629
if condition.Status == corev1.ConditionTrue {

0 commit comments

Comments
 (0)