diff --git a/pkg/ext-proc/backend/datastore.go b/pkg/ext-proc/backend/datastore.go index b466a2ed5..dde43bde3 100644 --- a/pkg/ext-proc/backend/datastore.go +++ b/pkg/ext-proc/backend/datastore.go @@ -8,6 +8,7 @@ import ( "inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1" logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/klog/v2" ) @@ -111,3 +112,17 @@ func IsCritical(model *v1alpha1.InferenceModel) bool { } return false } + +func (ds *K8sDatastore) LabelsMatch(podLabels map[string]string) bool { + poolSelector := labels.SelectorFromSet(whimsicalWondersOfAliasTypes(ds.inferencePool.Spec.Selector)) + podSet := labels.Set(podLabels) + return poolSelector.Matches(podSet) +} + +func whimsicalWondersOfAliasTypes(labels map[v1alpha1.LabelKey]v1alpha1.LabelValue) map[string]string { + outMap := make(map[string]string) + for k, v := range labels { + outMap[string(k)] = string(v) + } + return outMap +} diff --git a/pkg/ext-proc/backend/endpointslice_reconciler.go b/pkg/ext-proc/backend/endpointslice_reconciler.go deleted file mode 100644 index a2a9790f2..000000000 --- a/pkg/ext-proc/backend/endpointslice_reconciler.go +++ /dev/null @@ -1,109 +0,0 @@ -package backend - -import ( - "context" - "strconv" - "time" - - "inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1" - logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" - discoveryv1 "k8s.io/api/discovery/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/tools/record" - klog "k8s.io/klog/v2" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/builder" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/predicate" -) - -var ( - serviceOwnerLabel = "kubernetes.io/service-name" -) - -type EndpointSliceReconciler struct { - client.Client - Scheme *runtime.Scheme - Record record.EventRecorder - ServiceName string - Zone string - Datastore *K8sDatastore -} - -func (c *EndpointSliceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - inferencePool, err := c.Datastore.getInferencePool() - if err != nil { - klog.V(logutil.DEFAULT).Infof("Skipping reconciling EndpointSlice because the InferencePool is not available yet: %v", err) - return ctrl.Result{Requeue: true, RequeueAfter: time.Second}, nil - } - - klog.V(logutil.DEFAULT).Info("Reconciling EndpointSlice ", req.NamespacedName) - - endpointSlice := &discoveryv1.EndpointSlice{} - if err := c.Get(ctx, req.NamespacedName, endpointSlice); err != nil { - klog.Errorf("Unable to get EndpointSlice: %v", err) - return ctrl.Result{}, err - } - c.updateDatastore(endpointSlice, inferencePool) - - return ctrl.Result{}, nil -} - -// TODO: Support multiple endpointslices for a single service -func (c *EndpointSliceReconciler) updateDatastore( - slice *discoveryv1.EndpointSlice, - inferencePool *v1alpha1.InferencePool) { - podMap := make(map[Pod]bool) - - for _, endpoint := range slice.Endpoints { - klog.V(logutil.DEFAULT).Infof("Zone: %v \n endpoint: %+v \n", c.Zone, endpoint) - if c.validPod(endpoint) { - pod := Pod{ - Name: endpoint.TargetRef.Name, - Address: endpoint.Addresses[0] + ":" + strconv.Itoa(int(inferencePool.Spec.TargetPortNumber)), - } - podMap[pod] = true - klog.V(logutil.DEFAULT).Infof("Storing pod %v", pod) - c.Datastore.pods.Store(pod, true) - } - } - - removeOldPods := func(k, v any) bool { - pod, ok := k.(Pod) - if !ok { - klog.Errorf("Unable to cast key to Pod: %v", k) - return false - } - if _, ok := podMap[pod]; !ok { - klog.V(logutil.DEFAULT).Infof("Removing pod %v", pod) - c.Datastore.pods.Delete(pod) - } - return true - } - c.Datastore.pods.Range(removeOldPods) -} - -func (c *EndpointSliceReconciler) SetupWithManager(mgr ctrl.Manager) error { - ownsEndPointSlice := func(object client.Object) bool { - // Check if the object is an EndpointSlice - endpointSlice, ok := object.(*discoveryv1.EndpointSlice) - if !ok { - return false - } - - gotLabel := endpointSlice.ObjectMeta.Labels[serviceOwnerLabel] - wantLabel := c.ServiceName - return gotLabel == wantLabel - } - - return ctrl.NewControllerManagedBy(mgr). - For(&discoveryv1.EndpointSlice{}, - builder.WithPredicates(predicate.NewPredicateFuncs(ownsEndPointSlice))). - Complete(c) -} - -func (c *EndpointSliceReconciler) validPod(endpoint discoveryv1.Endpoint) bool { - validZone := c.Zone == "" || c.Zone != "" && *endpoint.Zone == c.Zone - return validZone && *endpoint.Conditions.Ready - -} diff --git a/pkg/ext-proc/backend/endpointslice_reconcilier_test.go b/pkg/ext-proc/backend/endpointslice_reconcilier_test.go deleted file mode 100644 index e3c927ba8..000000000 --- a/pkg/ext-proc/backend/endpointslice_reconcilier_test.go +++ /dev/null @@ -1,202 +0,0 @@ -package backend - -import ( - "sync" - "testing" - - "inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1" - v1 "k8s.io/api/core/v1" - discoveryv1 "k8s.io/api/discovery/v1" -) - -var ( - basePod1 = Pod{Name: "pod1"} - basePod2 = Pod{Name: "pod2"} - basePod3 = Pod{Name: "pod3"} -) - -func TestUpdateDatastore_EndpointSliceReconciler(t *testing.T) { - tests := []struct { - name string - datastore *K8sDatastore - incomingSlice *discoveryv1.EndpointSlice - wantPods *sync.Map - }{ - { - name: "Add new pod", - datastore: &K8sDatastore{ - pods: populateMap(basePod1, basePod2), - inferencePool: &v1alpha1.InferencePool{ - Spec: v1alpha1.InferencePoolSpec{ - TargetPortNumber: int32(8000), - }, - }, - }, - incomingSlice: &discoveryv1.EndpointSlice{ - Endpoints: []discoveryv1.Endpoint{ - { - TargetRef: &v1.ObjectReference{ - Name: "pod1", - }, - Zone: new(string), - Conditions: discoveryv1.EndpointConditions{ - Ready: truePointer(), - }, - Addresses: []string{"0.0.0.0"}, - }, - { - TargetRef: &v1.ObjectReference{ - Name: "pod2", - }, - Zone: new(string), - Conditions: discoveryv1.EndpointConditions{ - Ready: truePointer(), - }, - Addresses: []string{"0.0.0.0"}, - }, - { - TargetRef: &v1.ObjectReference{ - Name: "pod3", - }, - Zone: new(string), - Conditions: discoveryv1.EndpointConditions{ - Ready: truePointer(), - }, - Addresses: []string{"0.0.0.0"}, - }, - }, - }, - wantPods: populateMap(basePod1, basePod2, basePod3), - }, - { - name: "New pod, but its not ready yet. Do not add.", - datastore: &K8sDatastore{ - pods: populateMap(basePod1, basePod2), - inferencePool: &v1alpha1.InferencePool{ - Spec: v1alpha1.InferencePoolSpec{ - TargetPortNumber: int32(8000), - }, - }, - }, - incomingSlice: &discoveryv1.EndpointSlice{ - Endpoints: []discoveryv1.Endpoint{ - { - TargetRef: &v1.ObjectReference{ - Name: "pod1", - }, - Zone: new(string), - Conditions: discoveryv1.EndpointConditions{ - Ready: truePointer(), - }, - Addresses: []string{"0.0.0.0"}, - }, - { - TargetRef: &v1.ObjectReference{ - Name: "pod2", - }, - Zone: new(string), - Conditions: discoveryv1.EndpointConditions{ - Ready: truePointer(), - }, - Addresses: []string{"0.0.0.0"}, - }, - { - TargetRef: &v1.ObjectReference{ - Name: "pod3", - }, - Zone: new(string), - Conditions: discoveryv1.EndpointConditions{ - Ready: new(bool), - }, - Addresses: []string{"0.0.0.0"}, - }, - }, - }, - wantPods: populateMap(basePod1, basePod2), - }, - { - name: "Existing pod not ready, new pod added, and is ready", - datastore: &K8sDatastore{ - pods: populateMap(basePod1, basePod2), - inferencePool: &v1alpha1.InferencePool{ - Spec: v1alpha1.InferencePoolSpec{ - TargetPortNumber: int32(8000), - }, - }, - }, - incomingSlice: &discoveryv1.EndpointSlice{ - Endpoints: []discoveryv1.Endpoint{ - { - TargetRef: &v1.ObjectReference{ - Name: "pod1", - }, - Zone: new(string), - Conditions: discoveryv1.EndpointConditions{ - Ready: new(bool), - }, - Addresses: []string{"0.0.0.0"}, - }, - { - TargetRef: &v1.ObjectReference{ - Name: "pod2", - }, - Zone: new(string), - Conditions: discoveryv1.EndpointConditions{ - Ready: truePointer(), - }, - Addresses: []string{"0.0.0.0"}, - }, - { - TargetRef: &v1.ObjectReference{ - Name: "pod3", - }, - Zone: new(string), - Conditions: discoveryv1.EndpointConditions{ - Ready: truePointer(), - }, - Addresses: []string{"0.0.0.0"}, - }, - }, - }, - wantPods: populateMap(basePod3, basePod2), - }, - } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - endpointSliceReconciler := &EndpointSliceReconciler{Datastore: test.datastore, Zone: ""} - endpointSliceReconciler.updateDatastore(test.incomingSlice, test.datastore.inferencePool) - - if mapsEqual(endpointSliceReconciler.Datastore.pods, test.wantPods) { - t.Errorf("Unexpected output pod mismatch. \n Got %v \n Want: %v \n", - endpointSliceReconciler.Datastore.pods, - test.wantPods) - } - }) - } -} - -func mapsEqual(map1, map2 *sync.Map) bool { - equal := true - - map1.Range(func(k, v any) bool { - if _, ok := map2.Load(k); !ok { - equal = false - return false - } - return true - }) - map2.Range(func(k, v any) bool { - if _, ok := map1.Load(k); !ok { - equal = false - return false - } - return true - }) - - return equal -} - -func truePointer() *bool { - primitivePointersAreSilly := true - return &primitivePointersAreSilly -} diff --git a/pkg/ext-proc/backend/inferencemodel_reconciler_test.go b/pkg/ext-proc/backend/inferencemodel_reconciler_test.go index 5609ca532..117766b9c 100644 --- a/pkg/ext-proc/backend/inferencemodel_reconciler_test.go +++ b/pkg/ext-proc/backend/inferencemodel_reconciler_test.go @@ -146,3 +146,24 @@ func populateServiceMap(services ...*v1alpha1.InferenceModel) *sync.Map { } return returnVal } + +func mapsEqual(map1, map2 *sync.Map) bool { + equal := true + + map1.Range(func(k, v any) bool { + if _, ok := map2.Load(k); !ok { + equal = false + return false + } + return true + }) + map2.Range(func(k, v any) bool { + if _, ok := map1.Load(k); !ok { + equal = false + return false + } + return true + }) + + return equal +} diff --git a/pkg/ext-proc/backend/pod_reconciler.go b/pkg/ext-proc/backend/pod_reconciler.go new file mode 100644 index 000000000..5e7ced53e --- /dev/null +++ b/pkg/ext-proc/backend/pod_reconciler.go @@ -0,0 +1,73 @@ +package backend + +import ( + "context" + "strconv" + "time" + + "inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1" + logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type PodReconciler struct { + client.Client + Datastore *K8sDatastore + Scheme *runtime.Scheme + Record record.EventRecorder +} + +func (c *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + inferencePool, err := c.Datastore.getInferencePool() + if err != nil { + klog.V(logutil.DEFAULT).Infof("Skipping reconciling EndpointSlice because the InferencePool is not available yet: %v", err) + return ctrl.Result{Requeue: true, RequeueAfter: time.Second}, nil + } + + klog.V(logutil.VERBOSE).Info("reconciling Pod", req.NamespacedName) + + pod := &corev1.Pod{} + if err := c.Get(ctx, req.NamespacedName, pod); err != nil { + klog.Error(err, "unable to get InferencePool") + return ctrl.Result{}, err + } + + c.updateDatastore(pod, inferencePool) + + return ctrl.Result{}, nil +} + +func (c *PodReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&corev1.Pod{}). + Complete(c) +} + +func (c *PodReconciler) updateDatastore(k8sPod *corev1.Pod, inferencePool *v1alpha1.InferencePool) { + pod := Pod{ + Name: k8sPod.Name, + Address: k8sPod.Status.PodIP + ":" + strconv.Itoa(int(inferencePool.Spec.TargetPortNumber)), + } + if !c.Datastore.LabelsMatch(k8sPod.ObjectMeta.Labels) || !podIsReady(k8sPod) { + c.Datastore.pods.Delete(pod) + } else { + c.Datastore.pods.Store(pod, true) + } +} + +func podIsReady(pod *corev1.Pod) bool { + for _, condition := range pod.Status.Conditions { + if condition.Type == corev1.PodReady { + if condition.Status == corev1.ConditionTrue { + return true + } + break + } + } + return false +} diff --git a/pkg/ext-proc/backend/pod_reconciler_test.go b/pkg/ext-proc/backend/pod_reconciler_test.go new file mode 100644 index 000000000..1e8927493 --- /dev/null +++ b/pkg/ext-proc/backend/pod_reconciler_test.go @@ -0,0 +1,168 @@ +package backend + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var ( + basePod1 = Pod{Name: "pod1", Address: ":8000"} + basePod2 = Pod{Name: "pod2", Address: ":8000"} + basePod3 = Pod{Name: "pod3", Address: ":8000"} +) + +func TestUpdateDatastore_EndpointSliceReconciler(t *testing.T) { + tests := []struct { + name string + datastore *K8sDatastore + incomingPod *corev1.Pod + wantPods []string + }{ + { + name: "Add new pod", + datastore: &K8sDatastore{ + pods: populateMap(basePod1, basePod2), + inferencePool: &v1alpha1.InferencePool{ + Spec: v1alpha1.InferencePoolSpec{ + TargetPortNumber: int32(8000), + Selector: map[v1alpha1.LabelKey]v1alpha1.LabelValue{ + "some-key": "some-val", + }, + }, + }, + }, + incomingPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod3", + Labels: map[string]string{ + "some-key": "some-val", + }, + }, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + }, + }, + wantPods: []string{basePod1.Name, basePod2.Name, basePod3.Name}, + }, + { + name: "New pod, not ready, valid selector", + datastore: &K8sDatastore{ + pods: populateMap(basePod1, basePod2), + inferencePool: &v1alpha1.InferencePool{ + Spec: v1alpha1.InferencePoolSpec{ + TargetPortNumber: int32(8000), + Selector: map[v1alpha1.LabelKey]v1alpha1.LabelValue{ + "some-key": "some-val", + }, + }, + }, + }, + incomingPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod3", + Labels: map[string]string{ + "some-key": "some-val", + }, + }, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionFalse, + }, + }, + }, + }, + wantPods: []string{basePod1.Name, basePod2.Name}, + }, + { + name: "Remove pod that does not match selector", + datastore: &K8sDatastore{ + pods: populateMap(basePod1, basePod2), + inferencePool: &v1alpha1.InferencePool{ + Spec: v1alpha1.InferencePoolSpec{ + TargetPortNumber: int32(8000), + Selector: map[v1alpha1.LabelKey]v1alpha1.LabelValue{ + "some-key": "some-val", + }, + }, + }, + }, + incomingPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Labels: map[string]string{ + "some-wrong-key": "some-val", + }, + }, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + }, + }, + wantPods: []string{basePod2.Name}, + }, + { + name: "Remove pod that is not ready", + datastore: &K8sDatastore{ + pods: populateMap(basePod1, basePod2), + inferencePool: &v1alpha1.InferencePool{ + Spec: v1alpha1.InferencePoolSpec{ + TargetPortNumber: int32(8000), + Selector: map[v1alpha1.LabelKey]v1alpha1.LabelValue{ + "some-key": "some-val", + }, + }, + }, + }, + incomingPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Labels: map[string]string{ + "some-wrong-key": "some-val", + }, + }, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionFalse, + }, + }, + }, + }, + wantPods: []string{basePod2.Name}, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + podReconciler := &PodReconciler{Datastore: test.datastore} + podReconciler.updateDatastore(test.incomingPod, test.datastore.inferencePool) + var gotPods []string + test.datastore.pods.Range(func(k, v any) bool { + pod := k.(Pod) + if v != nil { + gotPods = append(gotPods, pod.Name) + } + return true + }) + if !cmp.Equal(gotPods, test.wantPods, cmpopts.SortSlices(func(a, b string) bool { return a < b })) { + t.Errorf("got (%v) != want (%v);", gotPods, test.wantPods) + } + }) + } +} diff --git a/pkg/ext-proc/server/runserver.go b/pkg/ext-proc/server/runserver.go index 94c6078c8..ae13105d8 100644 --- a/pkg/ext-proc/server/runserver.go +++ b/pkg/ext-proc/server/runserver.go @@ -95,13 +95,11 @@ func (r *ExtProcServerRunner) Setup() { klog.Fatalf("Failed setting up InferenceModelReconciler: %v", err) } - if err := (&backend.EndpointSliceReconciler{ - Datastore: r.Datastore, - Scheme: mgr.GetScheme(), - Client: mgr.GetClient(), - Record: mgr.GetEventRecorderFor("endpointslice"), - ServiceName: r.ServiceName, - Zone: r.Zone, + if err := (&backend.PodReconciler{ + Datastore: r.Datastore, + Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + Record: mgr.GetEventRecorderFor("pod"), }).SetupWithManager(mgr); err != nil { klog.Fatalf("Failed setting up EndpointSliceReconciler: %v", err) }