Skip to content

Commit ed639b7

Browse files
committed
implementing pod cache flushing logic
1 parent 8cc1b74 commit ed639b7

File tree

2 files changed

+33
-3
lines changed

2 files changed

+33
-3
lines changed

pkg/ext-proc/backend/datastore.go

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,18 @@
11
package backend
22

33
import (
4+
"context"
45
"errors"
56
"math/rand"
7+
"strconv"
68
"sync"
79

810
"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
911
logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
1012
corev1 "k8s.io/api/core/v1"
1113
"k8s.io/apimachinery/pkg/labels"
1214
"k8s.io/klog/v2"
15+
"sigs.k8s.io/controller-runtime/pkg/client"
1316
)
1417

1518
func NewK8sDataStore(options ...K8sDatastoreOption) *K8sDatastore {
@@ -114,12 +117,36 @@ func IsCritical(model *v1alpha1.InferenceModel) bool {
114117
}
115118

116119
func (ds *K8sDatastore) LabelsMatch(podLabels map[string]string) bool {
117-
poolSelector := labels.SelectorFromSet(whimsicalWondersOfAliasTypes(ds.inferencePool.Spec.Selector))
120+
poolSelector := selectorFromInferencePoolSelector(ds.inferencePool.Spec.Selector)
118121
podSet := labels.Set(podLabels)
119122
return poolSelector.Matches(podSet)
120123
}
121124

122-
func whimsicalWondersOfAliasTypes(labels map[v1alpha1.LabelKey]v1alpha1.LabelValue) map[string]string {
125+
func (ds *K8sDatastore) flushPodsAndRefetch(ctx context.Context, ctrlClient client.Client, newServerPool *v1alpha1.InferencePool) {
126+
podList := &corev1.PodList{}
127+
if err := ctrlClient.List(ctx, podList, selectorFromInferencePoolSelector(newServerPool.Spec.Selector).(client.MatchingLabelsSelector)); err != nil {
128+
klog.Error(err, "error listing clients")
129+
}
130+
podMap := sync.Map{}
131+
132+
for _, k8sPod := range podList.Items {
133+
pod := Pod{
134+
Name: k8sPod.Name,
135+
Address: k8sPod.Status.PodIP + ":" + strconv.Itoa(int(newServerPool.Spec.TargetPortNumber)),
136+
}
137+
podMap.Store(pod, true)
138+
}
139+
140+
// Clear is thread-safe, so if there are any in-flight accesses this should prevent odd data behavior. Then we replace after.
141+
ds.pods.Clear()
142+
ds.pods = &podMap
143+
}
144+
145+
func selectorFromInferencePoolSelector(selector map[v1alpha1.LabelKey]v1alpha1.LabelValue) labels.Selector {
146+
return labels.SelectorFromSet(stripLabelKeyAliasFromLabelMap(selector))
147+
}
148+
149+
func stripLabelKeyAliasFromLabelMap(labels map[v1alpha1.LabelKey]v1alpha1.LabelValue) map[string]string {
123150
outMap := make(map[string]string)
124151
for k, v := range labels {
125152
outMap[string(k)] = string(v)

pkg/ext-proc/backend/inferencepool_reconciler.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package backend
22

33
import (
44
"context"
5+
"reflect"
56

67
"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
78
"k8s.io/apimachinery/pkg/runtime"
@@ -34,7 +35,9 @@ func (c *InferencePoolReconciler) Reconcile(ctx context.Context, req ctrl.Reques
3435
klog.Error(err, "unable to get InferencePool")
3536
return ctrl.Result{}, err
3637
}
37-
38+
if !reflect.DeepEqual(serverPool.Spec.Selector, c.Datastore.inferencePool.Spec.Selector) {
39+
c.Datastore.flushPodsAndRefetch(ctx, c.Client, serverPool)
40+
}
3841
c.updateDatastore(serverPool)
3942

4043
return ctrl.Result{}, nil

0 commit comments

Comments
 (0)