diff --git a/pkg/ext-proc/backend/datastore.go b/pkg/ext-proc/backend/datastore.go index 3208be26b..be3c7f0b4 100644 --- a/pkg/ext-proc/backend/datastore.go +++ b/pkg/ext-proc/backend/datastore.go @@ -1,12 +1,16 @@ package backend import ( + "context" "errors" "math/rand" + "strconv" "sync" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" ) @@ -111,3 +115,41 @@ func IsCritical(model *v1alpha1.InferenceModel) bool { } return false } + +func (ds *K8sDatastore) LabelsMatch(podLabels map[string]string) bool { + poolSelector := selectorFromInferencePoolSelector(ds.inferencePool.Spec.Selector) + podSet := labels.Set(podLabels) + return poolSelector.Matches(podSet) +} + +func (ds *K8sDatastore) flushPodsAndRefetch(ctx context.Context, ctrlClient client.Client, newServerPool *v1alpha1.InferencePool) { + podList := &corev1.PodList{} + if err := ctrlClient.List(ctx, podList, &client.ListOptions{ + LabelSelector: selectorFromInferencePoolSelector(newServerPool.Spec.Selector), + Namespace: newServerPool.Namespace, + }); err != nil { + klog.Error(err, "error listing clients") + } + ds.pods.Clear() + + for _, k8sPod := range podList.Items { + pod := Pod{ + Name: k8sPod.Name, + Address: k8sPod.Status.PodIP + ":" + strconv.Itoa(int(newServerPool.Spec.TargetPortNumber)), + } + ds.pods.Store(pod, true) + } + +} + +func selectorFromInferencePoolSelector(selector map[v1alpha1.LabelKey]v1alpha1.LabelValue) labels.Selector { + return labels.SelectorFromSet(stripLabelKeyAliasFromLabelMap(selector)) +} + +func stripLabelKeyAliasFromLabelMap(labels map[v1alpha1.LabelKey]v1alpha1.LabelValue) map[string]string { + outMap := make(map[string]string) + for k, v := range labels { + outMap[string(k)] = string(v) + } + return outMap +} diff --git a/pkg/ext-proc/backend/endpointslice_reconciler.go b/pkg/ext-proc/backend/endpointslice_reconciler.go deleted file mode 100644 index ebc182b8b..000000000 --- a/pkg/ext-proc/backend/endpointslice_reconciler.go +++ /dev/null @@ -1,109 +0,0 @@ -package backend - -import ( - "context" - "strconv" - "time" - - discoveryv1 "k8s.io/api/discovery/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/tools/record" - klog "k8s.io/klog/v2" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/builder" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/predicate" - "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1" - logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" -) - -var ( - serviceOwnerLabel = "kubernetes.io/service-name" -) - -type EndpointSliceReconciler struct { - client.Client - Scheme *runtime.Scheme - Record record.EventRecorder - ServiceName string - Zone string - Datastore *K8sDatastore -} - -func (c *EndpointSliceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - inferencePool, err := c.Datastore.getInferencePool() - if err != nil { - klog.V(logutil.DEFAULT).Infof("Skipping reconciling EndpointSlice because the InferencePool is not available yet: %v", err) - return ctrl.Result{Requeue: true, RequeueAfter: time.Second}, nil - } - - klog.V(logutil.DEFAULT).Info("Reconciling EndpointSlice ", req.NamespacedName) - - endpointSlice := &discoveryv1.EndpointSlice{} - if err := c.Get(ctx, req.NamespacedName, endpointSlice); err != nil { - klog.Errorf("Unable to get EndpointSlice: %v", err) - return ctrl.Result{}, err - } - c.updateDatastore(endpointSlice, inferencePool) - - return ctrl.Result{}, nil -} - -// TODO: Support multiple endpointslices for a single service -func (c *EndpointSliceReconciler) updateDatastore( - slice *discoveryv1.EndpointSlice, - inferencePool *v1alpha1.InferencePool) { - podMap := make(map[Pod]bool) - - for _, endpoint := range slice.Endpoints { - klog.V(logutil.DEFAULT).Infof("Zone: %v \n endpoint: %+v \n", c.Zone, endpoint) - if c.validPod(endpoint) { - pod := Pod{ - Name: endpoint.TargetRef.Name, - Address: endpoint.Addresses[0] + ":" + strconv.Itoa(int(inferencePool.Spec.TargetPortNumber)), - } - podMap[pod] = true - klog.V(logutil.DEFAULT).Infof("Storing pod %v", pod) - c.Datastore.pods.Store(pod, true) - } - } - - removeOldPods := func(k, v any) bool { - pod, ok := k.(Pod) - if !ok { - klog.Errorf("Unable to cast key to Pod: %v", k) - return false - } - if _, ok := podMap[pod]; !ok { - klog.V(logutil.DEFAULT).Infof("Removing pod %v", pod) - c.Datastore.pods.Delete(pod) - } - return true - } - c.Datastore.pods.Range(removeOldPods) -} - -func (c *EndpointSliceReconciler) SetupWithManager(mgr ctrl.Manager) error { - ownsEndPointSlice := func(object client.Object) bool { - // Check if the object is an EndpointSlice - endpointSlice, ok := object.(*discoveryv1.EndpointSlice) - if !ok { - return false - } - - gotLabel := endpointSlice.ObjectMeta.Labels[serviceOwnerLabel] - wantLabel := c.ServiceName - return gotLabel == wantLabel - } - - return ctrl.NewControllerManagedBy(mgr). - For(&discoveryv1.EndpointSlice{}, - builder.WithPredicates(predicate.NewPredicateFuncs(ownsEndPointSlice))). - Complete(c) -} - -func (c *EndpointSliceReconciler) validPod(endpoint discoveryv1.Endpoint) bool { - validZone := c.Zone == "" || c.Zone != "" && *endpoint.Zone == c.Zone - return validZone && *endpoint.Conditions.Ready - -} diff --git a/pkg/ext-proc/backend/endpointslice_reconcilier_test.go b/pkg/ext-proc/backend/endpointslice_reconcilier_test.go deleted file mode 100644 index 9a3d55d8d..000000000 --- a/pkg/ext-proc/backend/endpointslice_reconcilier_test.go +++ /dev/null @@ -1,202 +0,0 @@ -package backend - -import ( - "sync" - "testing" - - v1 "k8s.io/api/core/v1" - discoveryv1 "k8s.io/api/discovery/v1" - "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1" -) - -var ( - basePod1 = Pod{Name: "pod1"} - basePod2 = Pod{Name: "pod2"} - basePod3 = Pod{Name: "pod3"} -) - -func TestUpdateDatastore_EndpointSliceReconciler(t *testing.T) { - tests := []struct { - name string - datastore *K8sDatastore - incomingSlice *discoveryv1.EndpointSlice - wantPods *sync.Map - }{ - { - name: "Add new pod", - datastore: &K8sDatastore{ - pods: populateMap(basePod1, basePod2), - inferencePool: &v1alpha1.InferencePool{ - Spec: v1alpha1.InferencePoolSpec{ - TargetPortNumber: int32(8000), - }, - }, - }, - incomingSlice: &discoveryv1.EndpointSlice{ - Endpoints: []discoveryv1.Endpoint{ - { - TargetRef: &v1.ObjectReference{ - Name: "pod1", - }, - Zone: new(string), - Conditions: discoveryv1.EndpointConditions{ - Ready: truePointer(), - }, - Addresses: []string{"0.0.0.0"}, - }, - { - TargetRef: &v1.ObjectReference{ - Name: "pod2", - }, - Zone: new(string), - Conditions: discoveryv1.EndpointConditions{ - Ready: truePointer(), - }, - Addresses: []string{"0.0.0.0"}, - }, - { - TargetRef: &v1.ObjectReference{ - Name: "pod3", - }, - Zone: new(string), - Conditions: discoveryv1.EndpointConditions{ - Ready: truePointer(), - }, - Addresses: []string{"0.0.0.0"}, - }, - }, - }, - wantPods: populateMap(basePod1, basePod2, basePod3), - }, - { - name: "New pod, but its not ready yet. Do not add.", - datastore: &K8sDatastore{ - pods: populateMap(basePod1, basePod2), - inferencePool: &v1alpha1.InferencePool{ - Spec: v1alpha1.InferencePoolSpec{ - TargetPortNumber: int32(8000), - }, - }, - }, - incomingSlice: &discoveryv1.EndpointSlice{ - Endpoints: []discoveryv1.Endpoint{ - { - TargetRef: &v1.ObjectReference{ - Name: "pod1", - }, - Zone: new(string), - Conditions: discoveryv1.EndpointConditions{ - Ready: truePointer(), - }, - Addresses: []string{"0.0.0.0"}, - }, - { - TargetRef: &v1.ObjectReference{ - Name: "pod2", - }, - Zone: new(string), - Conditions: discoveryv1.EndpointConditions{ - Ready: truePointer(), - }, - Addresses: []string{"0.0.0.0"}, - }, - { - TargetRef: &v1.ObjectReference{ - Name: "pod3", - }, - Zone: new(string), - Conditions: discoveryv1.EndpointConditions{ - Ready: new(bool), - }, - Addresses: []string{"0.0.0.0"}, - }, - }, - }, - wantPods: populateMap(basePod1, basePod2), - }, - { - name: "Existing pod not ready, new pod added, and is ready", - datastore: &K8sDatastore{ - pods: populateMap(basePod1, basePod2), - inferencePool: &v1alpha1.InferencePool{ - Spec: v1alpha1.InferencePoolSpec{ - TargetPortNumber: int32(8000), - }, - }, - }, - incomingSlice: &discoveryv1.EndpointSlice{ - Endpoints: []discoveryv1.Endpoint{ - { - TargetRef: &v1.ObjectReference{ - Name: "pod1", - }, - Zone: new(string), - Conditions: discoveryv1.EndpointConditions{ - Ready: new(bool), - }, - Addresses: []string{"0.0.0.0"}, - }, - { - TargetRef: &v1.ObjectReference{ - Name: "pod2", - }, - Zone: new(string), - Conditions: discoveryv1.EndpointConditions{ - Ready: truePointer(), - }, - Addresses: []string{"0.0.0.0"}, - }, - { - TargetRef: &v1.ObjectReference{ - Name: "pod3", - }, - Zone: new(string), - Conditions: discoveryv1.EndpointConditions{ - Ready: truePointer(), - }, - Addresses: []string{"0.0.0.0"}, - }, - }, - }, - wantPods: populateMap(basePod3, basePod2), - }, - } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - endpointSliceReconciler := &EndpointSliceReconciler{Datastore: test.datastore, Zone: ""} - endpointSliceReconciler.updateDatastore(test.incomingSlice, test.datastore.inferencePool) - - if mapsEqual(endpointSliceReconciler.Datastore.pods, test.wantPods) { - t.Errorf("Unexpected output pod mismatch. \n Got %v \n Want: %v \n", - endpointSliceReconciler.Datastore.pods, - test.wantPods) - } - }) - } -} - -func mapsEqual(map1, map2 *sync.Map) bool { - equal := true - - map1.Range(func(k, v any) bool { - if _, ok := map2.Load(k); !ok { - equal = false - return false - } - return true - }) - map2.Range(func(k, v any) bool { - if _, ok := map1.Load(k); !ok { - equal = false - return false - } - return true - }) - - return equal -} - -func truePointer() *bool { - primitivePointersAreSilly := true - return &primitivePointersAreSilly -} diff --git a/pkg/ext-proc/backend/inferencemodel_reconciler_test.go b/pkg/ext-proc/backend/inferencemodel_reconciler_test.go index 415358b2d..c5ef8d147 100644 --- a/pkg/ext-proc/backend/inferencemodel_reconciler_test.go +++ b/pkg/ext-proc/backend/inferencemodel_reconciler_test.go @@ -309,3 +309,24 @@ func populateServiceMap(services ...*v1alpha1.InferenceModel) *sync.Map { } return returnVal } + +func mapsEqual(map1, map2 *sync.Map) bool { + equal := true + + map1.Range(func(k, v any) bool { + if _, ok := map2.Load(k); !ok { + equal = false + return false + } + return true + }) + map2.Range(func(k, v any) bool { + if _, ok := map1.Load(k); !ok { + equal = false + return false + } + return true + }) + + return equal +} diff --git a/pkg/ext-proc/backend/inferencepool_reconciler.go b/pkg/ext-proc/backend/inferencepool_reconciler.go index b4cba2020..fd15ebc33 100644 --- a/pkg/ext-proc/backend/inferencepool_reconciler.go +++ b/pkg/ext-proc/backend/inferencepool_reconciler.go @@ -2,6 +2,7 @@ package backend import ( "context" + "reflect" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -21,7 +22,6 @@ type InferencePoolReconciler struct { Record record.EventRecorder PoolNamespacedName types.NamespacedName Datastore *K8sDatastore - Zone string } func (c *InferencePoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { @@ -32,11 +32,15 @@ func (c *InferencePoolReconciler) Reconcile(ctx context.Context, req ctrl.Reques serverPool := &v1alpha1.InferencePool{} if err := c.Get(ctx, req.NamespacedName, serverPool); err != nil { - klog.Error(err, "unable to get InferencePool") + klog.Error(err, ": unable to get InferencePool") return ctrl.Result{}, err } - - c.updateDatastore(serverPool) + if c.Datastore.inferencePool == nil || !reflect.DeepEqual(serverPool.Spec.Selector, c.Datastore.inferencePool.Spec.Selector) { + c.updateDatastore(serverPool) + c.Datastore.flushPodsAndRefetch(ctx, c.Client, serverPool) + } else { + c.updateDatastore(serverPool) + } return ctrl.Result{}, nil } diff --git a/pkg/ext-proc/backend/pod_reconciler.go b/pkg/ext-proc/backend/pod_reconciler.go new file mode 100644 index 000000000..60d014ce6 --- /dev/null +++ b/pkg/ext-proc/backend/pod_reconciler.go @@ -0,0 +1,80 @@ +package backend + +import ( + "context" + "strconv" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1" + logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" +) + +type PodReconciler struct { + client.Client + Datastore *K8sDatastore + Scheme *runtime.Scheme + Record record.EventRecorder +} + +func (c *PodReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + inferencePool, err := c.Datastore.getInferencePool() + if err != nil { + klog.V(logutil.DEFAULT).Infof("Skipping reconciling Pod because the InferencePool is not available yet: %v", err) + // When the inferencePool is initialized it lists the appropriate pods and populates the datastore, so no need to requeue. + return ctrl.Result{}, nil + } else if inferencePool.Namespace != req.Namespace { + return ctrl.Result{}, nil + } + + klog.V(logutil.VERBOSE).Info("reconciling Pod", req.NamespacedName) + + pod := &corev1.Pod{} + if err := c.Get(ctx, req.NamespacedName, pod); err != nil { + klog.Error(err, ": unable to get pod") + if apierrors.IsNotFound(err) { + c.Datastore.pods.Delete(pod) + return ctrl.Result{}, nil + } + return ctrl.Result{}, err + } + + c.updateDatastore(pod, inferencePool) + + return ctrl.Result{}, nil +} + +func (c *PodReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&corev1.Pod{}). + Complete(c) +} + +func (c *PodReconciler) updateDatastore(k8sPod *corev1.Pod, inferencePool *v1alpha1.InferencePool) { + pod := Pod{ + Name: k8sPod.Name, + Address: k8sPod.Status.PodIP + ":" + strconv.Itoa(int(inferencePool.Spec.TargetPortNumber)), + } + if !k8sPod.DeletionTimestamp.IsZero() || !c.Datastore.LabelsMatch(k8sPod.ObjectMeta.Labels) || !podIsReady(k8sPod) { + c.Datastore.pods.Delete(pod) + } else { + c.Datastore.pods.Store(pod, true) + } +} + +func podIsReady(pod *corev1.Pod) bool { + for _, condition := range pod.Status.Conditions { + if condition.Type == corev1.PodReady { + if condition.Status == corev1.ConditionTrue { + return true + } + break + } + } + return false +} diff --git a/pkg/ext-proc/backend/pod_reconciler_test.go b/pkg/ext-proc/backend/pod_reconciler_test.go new file mode 100644 index 000000000..42d6d8e42 --- /dev/null +++ b/pkg/ext-proc/backend/pod_reconciler_test.go @@ -0,0 +1,168 @@ +package backend + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1" +) + +var ( + basePod1 = Pod{Name: "pod1", Address: ":8000"} + basePod2 = Pod{Name: "pod2", Address: ":8000"} + basePod3 = Pod{Name: "pod3", Address: ":8000"} +) + +func TestUpdateDatastore_PodReconciler(t *testing.T) { + tests := []struct { + name string + datastore *K8sDatastore + incomingPod *corev1.Pod + wantPods []string + }{ + { + name: "Add new pod", + datastore: &K8sDatastore{ + pods: populateMap(basePod1, basePod2), + inferencePool: &v1alpha1.InferencePool{ + Spec: v1alpha1.InferencePoolSpec{ + TargetPortNumber: int32(8000), + Selector: map[v1alpha1.LabelKey]v1alpha1.LabelValue{ + "some-key": "some-val", + }, + }, + }, + }, + incomingPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod3", + Labels: map[string]string{ + "some-key": "some-val", + }, + }, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + }, + }, + wantPods: []string{basePod1.Name, basePod2.Name, basePod3.Name}, + }, + { + name: "New pod, not ready, valid selector", + datastore: &K8sDatastore{ + pods: populateMap(basePod1, basePod2), + inferencePool: &v1alpha1.InferencePool{ + Spec: v1alpha1.InferencePoolSpec{ + TargetPortNumber: int32(8000), + Selector: map[v1alpha1.LabelKey]v1alpha1.LabelValue{ + "some-key": "some-val", + }, + }, + }, + }, + incomingPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod3", + Labels: map[string]string{ + "some-key": "some-val", + }, + }, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionFalse, + }, + }, + }, + }, + wantPods: []string{basePod1.Name, basePod2.Name}, + }, + { + name: "Remove pod that does not match selector", + datastore: &K8sDatastore{ + pods: populateMap(basePod1, basePod2), + inferencePool: &v1alpha1.InferencePool{ + Spec: v1alpha1.InferencePoolSpec{ + TargetPortNumber: int32(8000), + Selector: map[v1alpha1.LabelKey]v1alpha1.LabelValue{ + "some-key": "some-val", + }, + }, + }, + }, + incomingPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Labels: map[string]string{ + "some-wrong-key": "some-val", + }, + }, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + }, + }, + wantPods: []string{basePod2.Name}, + }, + { + name: "Remove pod that is not ready", + datastore: &K8sDatastore{ + pods: populateMap(basePod1, basePod2), + inferencePool: &v1alpha1.InferencePool{ + Spec: v1alpha1.InferencePoolSpec{ + TargetPortNumber: int32(8000), + Selector: map[v1alpha1.LabelKey]v1alpha1.LabelValue{ + "some-key": "some-val", + }, + }, + }, + }, + incomingPod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + Labels: map[string]string{ + "some-wrong-key": "some-val", + }, + }, + Status: corev1.PodStatus{ + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionFalse, + }, + }, + }, + }, + wantPods: []string{basePod2.Name}, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + podReconciler := &PodReconciler{Datastore: test.datastore} + podReconciler.updateDatastore(test.incomingPod, test.datastore.inferencePool) + var gotPods []string + test.datastore.pods.Range(func(k, v any) bool { + pod := k.(Pod) + if v != nil { + gotPods = append(gotPods, pod.Name) + } + return true + }) + if !cmp.Equal(gotPods, test.wantPods, cmpopts.SortSlices(func(a, b string) bool { return a < b })) { + t.Errorf("got (%v) != want (%v);", gotPods, test.wantPods) + } + }) + } +} diff --git a/pkg/ext-proc/main.go b/pkg/ext-proc/main.go index 634c3581e..8442dade0 100644 --- a/pkg/ext-proc/main.go +++ b/pkg/ext-proc/main.go @@ -53,14 +53,6 @@ var ( "poolNamespace", runserver.DefaultPoolNamespace, "Namespace of the InferencePool this Endpoint Picker is associated with.") - serviceName = flag.String( - "serviceName", - runserver.DefaultServiceName, - "Name of the Service that will be used to read EndpointSlices from") - zone = flag.String( - "zone", - runserver.DefaultZone, - "The zone that this instance is created in. Will be passed to the corresponding endpointSlice. ") refreshPodsInterval = flag.Duration( "refreshPodsInterval", runserver.DefaultRefreshPodsInterval, @@ -110,8 +102,6 @@ func main() { TargetEndpointKey: *targetEndpointKey, PoolName: *poolName, PoolNamespace: *poolNamespace, - ServiceName: *serviceName, - Zone: *zone, RefreshPodsInterval: *refreshPodsInterval, RefreshMetricsInterval: *refreshMetricsInterval, RefreshPrometheusMetricsInterval: *refreshPrometheusMetricsInterval, @@ -221,9 +211,5 @@ func validateFlags() error { return fmt.Errorf("required %q flag not set", "poolName") } - if *serviceName == "" { - return fmt.Errorf("required %q flag not set", "serviceName") - } - return nil } diff --git a/pkg/ext-proc/server/runserver.go b/pkg/ext-proc/server/runserver.go index affb4b6cb..4c670a6b8 100644 --- a/pkg/ext-proc/server/runserver.go +++ b/pkg/ext-proc/server/runserver.go @@ -23,8 +23,6 @@ type ExtProcServerRunner struct { TargetEndpointKey string PoolName string PoolNamespace string - ServiceName string - Zone string RefreshPodsInterval time.Duration RefreshMetricsInterval time.Duration RefreshPrometheusMetricsInterval time.Duration @@ -40,8 +38,6 @@ const ( DefaultTargetEndpointKey = "x-gateway-destination-endpoint" // default for --targetEndpointKey DefaultPoolName = "" // required but no default DefaultPoolNamespace = "default" // default for --poolNamespace - DefaultServiceName = "" // required but no default - DefaultZone = "" // default for --zone DefaultRefreshPodsInterval = 10 * time.Second // default for --refreshPodsInterval DefaultRefreshMetricsInterval = 50 * time.Millisecond // default for --refreshMetricsInterval DefaultRefreshPrometheusMetricsInterval = 5 * time.Second // default for --refreshPrometheusMetricsInterval @@ -53,8 +49,6 @@ func NewDefaultExtProcServerRunner() *ExtProcServerRunner { TargetEndpointKey: DefaultTargetEndpointKey, PoolName: DefaultPoolName, PoolNamespace: DefaultPoolNamespace, - ServiceName: DefaultServiceName, - Zone: DefaultZone, RefreshPodsInterval: DefaultRefreshPodsInterval, RefreshMetricsInterval: DefaultRefreshMetricsInterval, RefreshPrometheusMetricsInterval: DefaultRefreshPrometheusMetricsInterval, @@ -98,13 +92,11 @@ func (r *ExtProcServerRunner) Setup() { klog.Fatalf("Failed setting up InferenceModelReconciler: %v", err) } - if err := (&backend.EndpointSliceReconciler{ - Datastore: r.Datastore, - Scheme: mgr.GetScheme(), - Client: mgr.GetClient(), - Record: mgr.GetEventRecorderFor("endpointslice"), - ServiceName: r.ServiceName, - Zone: r.Zone, + if err := (&backend.PodReconciler{ + Datastore: r.Datastore, + Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + Record: mgr.GetEventRecorderFor("pod"), }).SetupWithManager(mgr); err != nil { klog.Fatalf("Failed setting up EndpointSliceReconciler: %v", err) } diff --git a/pkg/manifests/ext_proc.yaml b/pkg/manifests/ext_proc.yaml index a7dc7678c..49145d24c 100644 --- a/pkg/manifests/ext_proc.yaml +++ b/pkg/manifests/ext_proc.yaml @@ -78,8 +78,6 @@ spec: - "vllm-llama2-7b-pool" - -v - "3" - - -serviceName - - "vllm-llama2-7b-pool" - -grpcPort - "9002" - -grpcHealthPort