diff --git a/pkg/ext-proc/backend/datastore.go b/pkg/ext-proc/backend/datastore.go index a7cf54a89..abeced3f1 100644 --- a/pkg/ext-proc/backend/datastore.go +++ b/pkg/ext-proc/backend/datastore.go @@ -1,6 +1,7 @@ package backend import ( + "fmt" "math/rand" "sync" @@ -9,16 +10,57 @@ import ( "k8s.io/klog/v2" ) +func NewK8sDataStore(options ...K8sDatastoreOption) *K8sDatastore { + store := &K8sDatastore{ + poolMu: sync.RWMutex{}, + llmServices: &sync.Map{}, + pods: &sync.Map{}, + } + for _, opt := range options { + opt(store) + } + return store +} + // The datastore is a local cache of relevant data for the given LLMServerPool (currently all pulled from k8s-api) type K8sDatastore struct { - LLMServerPool *v1alpha1.LLMServerPool - LLMServices *sync.Map - Pods *sync.Map + // poolMu is used to synchronize access to the llmServerPool. + poolMu sync.RWMutex + llmServerPool *v1alpha1.LLMServerPool + llmServices *sync.Map + pods *sync.Map +} + +type K8sDatastoreOption func(*K8sDatastore) + +// WithPods can be used in tests to override the pods. +func WithPods(pods []*PodMetrics) K8sDatastoreOption { + return func(store *K8sDatastore) { + store.pods = &sync.Map{} + for _, pod := range pods { + store.pods.Store(pod.Pod, true) + } + } +} + +func (ds *K8sDatastore) setLLMServerPool(pool *v1alpha1.LLMServerPool) { + ds.poolMu.Lock() + defer ds.poolMu.Unlock() + ds.llmServerPool = pool +} + +func (ds *K8sDatastore) getLLMServerPool() (*v1alpha1.LLMServerPool, error) { + ds.poolMu.RLock() + defer ds.poolMu.RUnlock() + if ds.llmServerPool == nil { + return nil, fmt.Errorf("LLMServerPool hasn't been initialized yet") + } + return ds.llmServerPool, nil } func (ds *K8sDatastore) GetPodIPs() []string { var ips []string - ds.Pods.Range(func(name, pod any) bool { + ds.pods.Range(func(name, pod any) bool { ips = append(ips, pod.(*corev1.Pod).Status.PodIP) return true }) @@ -26,7 +68,7 @@ func (ds *K8sDatastore) GetPodIPs() []string { } func (s *K8sDatastore) FetchModelData(modelName string) (returnModel *v1alpha1.Model) { - s.LLMServices.Range(func(k, v any) bool { + s.llmServices.Range(func(k, v any) bool { service := v.(*v1alpha1.LLMService) klog.V(3).Infof("Service name: %v", service.Name) for _, model := range service.Spec.Models { diff --git a/pkg/ext-proc/backend/datastore_test.go b/pkg/ext-proc/backend/datastore_test.go index b47db1a57..8ac1e339a 100644 --- a/pkg/ext-proc/backend/datastore_test.go +++ b/pkg/ext-proc/backend/datastore_test.go @@ -10,10 +10,9 @@ var () func TestRandomWeightedDraw(t *testing.T) { tests := []struct { - name string - datastore K8sDatastore - model *v1alpha1.Model - want string + name string + model *v1alpha1.Model + want string }{ { name: "'random' distribution", diff --git a/pkg/ext-proc/backend/endpointslice_reconciler.go b/pkg/ext-proc/backend/endpointslice_reconciler.go index 06c4faad8..e7911d776 100644 --- a/pkg/ext-proc/backend/endpointslice_reconciler.go +++ b/pkg/ext-proc/backend/endpointslice_reconciler.go @@ -9,7 +9,9 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/predicate" ) var ( @@ -28,18 +30,14 @@ type EndpointSliceReconciler struct { } func (c *EndpointSliceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - klog.V(1).Info("reconciling EndpointSlice ", req.NamespacedName) + klog.V(2).Info("Reconciling EndpointSlice ", req.NamespacedName) endpointSlice := &discoveryv1.EndpointSlice{} if err := c.Get(ctx, req.NamespacedName, endpointSlice); err != nil { - klog.Error(err, "unable to get LLMServerPool") + klog.Errorf("Unable to get EndpointSlice: %v", err) return ctrl.Result{}, err } - if !c.ownsEndPointSlice(endpointSlice.ObjectMeta.Labels) { - return ctrl.Result{}, nil - } - c.updateDatastore(endpointSlice) return ctrl.Result{}, nil @@ -50,9 +48,9 @@ func (c *EndpointSliceReconciler) updateDatastore(slice *discoveryv1.EndpointSli for _, endpoint := range slice.Endpoints { klog.V(4).Infof("Zone: %v \n endpoint: %+v \n", c.Zone, endpoint) if c.validPod(endpoint) { - pod := Pod{Name: *&endpoint.TargetRef.Name, Address: endpoint.Addresses[0] + ":" + fmt.Sprint(c.Datastore.LLMServerPool.Spec.TargetPort)} + pod := Pod{Name: *&endpoint.TargetRef.Name, Address: endpoint.Addresses[0] + ":" + fmt.Sprint(c.Datastore.llmServerPool.Spec.TargetPort)} podMap[pod] = true - c.Datastore.Pods.Store(pod, true) + c.Datastore.pods.Store(pod, true) } } @@ -63,23 +61,37 @@ func (c *EndpointSliceReconciler) updateDatastore(slice *discoveryv1.EndpointSli return false } if _, ok := podMap[pod]; !ok { - c.Datastore.Pods.Delete(pod) + c.Datastore.pods.Delete(pod) } return true } - c.Datastore.Pods.Range(removeOldPods) + c.Datastore.pods.Range(removeOldPods) } func (c *EndpointSliceReconciler) SetupWithManager(mgr ctrl.Manager) error { + llmServerPoolAvailable := func(object client.Object) bool { + _, err := c.Datastore.getLLMServerPool() + if err != nil { + klog.Warningf("Skipping reconciling EndpointSlice because LLMServerPool is not available yet: %v", err) + } + return err == nil + } + + ownsEndPointSlice := func(object client.Object) bool { + // Check if the object is an EndpointSlice + endpointSlice, ok := object.(*discoveryv1.EndpointSlice) + if !ok { + return false + } + + return endpointSlice.ObjectMeta.Labels[serviceOwnerLabel] == c.ServiceName + } + return ctrl.NewControllerManagedBy(mgr). - For(&discoveryv1.EndpointSlice{}). + For(&discoveryv1.EndpointSlice{}, builder.WithPredicates(predicate.NewPredicateFuncs(llmServerPoolAvailable), predicate.NewPredicateFuncs(ownsEndPointSlice))). Complete(c) } -func (c *EndpointSliceReconciler) ownsEndPointSlice(labels map[string]string) bool { - return labels[serviceOwnerLabel] == c.ServiceName -} - func (c *EndpointSliceReconciler) validPod(endpoint discoveryv1.Endpoint) bool { validZone := c.Zone == "" || c.Zone != "" && *endpoint.Zone == c.Zone return validZone && *endpoint.Conditions.Ready == true diff --git a/pkg/ext-proc/backend/endpointslice_reconcilier_test.go b/pkg/ext-proc/backend/endpointslice_reconcilier_test.go index f965d3e19..f0d1552e8 100644 --- a/pkg/ext-proc/backend/endpointslice_reconcilier_test.go +++ b/pkg/ext-proc/backend/endpointslice_reconcilier_test.go @@ -18,15 +18,15 @@ var ( func TestUpdateDatastore_EndpointSliceReconciler(t *testing.T) { tests := []struct { name string - datastore K8sDatastore + datastore *K8sDatastore incomingSlice *discoveryv1.EndpointSlice - want K8sDatastore + wantPods *sync.Map }{ { name: "Add new pod", - datastore: K8sDatastore{ - Pods: populateMap(basePod1, basePod2), - LLMServerPool: &v1alpha1.LLMServerPool{ + datastore: &K8sDatastore{ + pods: populateMap(basePod1, basePod2), + llmServerPool: &v1alpha1.LLMServerPool{ Spec: v1alpha1.LLMServerPoolSpec{ TargetPort: int32(8000), }, @@ -66,15 +66,13 @@ func TestUpdateDatastore_EndpointSliceReconciler(t *testing.T) { }, }, }, - want: K8sDatastore{ - Pods: populateMap(basePod1, basePod2, basePod3), - }, + wantPods: populateMap(basePod1, basePod2, basePod3), }, { name: "New pod, but its not ready yet. Do not add.", - datastore: K8sDatastore{ - Pods: populateMap(basePod1, basePod2), - LLMServerPool: &v1alpha1.LLMServerPool{ + datastore: &K8sDatastore{ + pods: populateMap(basePod1, basePod2), + llmServerPool: &v1alpha1.LLMServerPool{ Spec: v1alpha1.LLMServerPoolSpec{ TargetPort: int32(8000), }, @@ -114,15 +112,13 @@ func TestUpdateDatastore_EndpointSliceReconciler(t *testing.T) { }, }, }, - want: K8sDatastore{ - Pods: populateMap(basePod1, basePod2), - }, + wantPods: populateMap(basePod1, basePod2), }, { name: "Existing pod not ready, new pod added, and is ready", - datastore: K8sDatastore{ - Pods: populateMap(basePod1, basePod2), - LLMServerPool: &v1alpha1.LLMServerPool{ + datastore: &K8sDatastore{ + pods: populateMap(basePod1, basePod2), + llmServerPool: &v1alpha1.LLMServerPool{ Spec: v1alpha1.LLMServerPoolSpec{ TargetPort: int32(8000), }, @@ -162,18 +158,16 @@ func TestUpdateDatastore_EndpointSliceReconciler(t *testing.T) { }, }, }, - want: K8sDatastore{ - Pods: populateMap(basePod3, basePod2), - }, + wantPods: populateMap(basePod3, basePod2), }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - endpointSliceReconciler := &EndpointSliceReconciler{Datastore: &test.datastore, Zone: ""} + endpointSliceReconciler := &EndpointSliceReconciler{Datastore: test.datastore, Zone: ""} endpointSliceReconciler.updateDatastore(test.incomingSlice) - if mapsEqual(endpointSliceReconciler.Datastore.Pods, test.want.Pods) { - t.Errorf("Unexpected output pod mismatch. \n Got %v \n Want: %v \n", endpointSliceReconciler.Datastore.Pods, test.want.Pods) + if mapsEqual(endpointSliceReconciler.Datastore.pods, test.wantPods) { + t.Errorf("Unexpected output pod mismatch. \n Got %v \n Want: %v \n", endpointSliceReconciler.Datastore.pods, test.wantPods) } }) } diff --git a/pkg/ext-proc/backend/llmlserverpool_reconciler_test.go b/pkg/ext-proc/backend/llmlserverpool_reconciler_test.go deleted file mode 100644 index 2e3163417..000000000 --- a/pkg/ext-proc/backend/llmlserverpool_reconciler_test.go +++ /dev/null @@ -1,97 +0,0 @@ -package backend - -import ( - "testing" - - "github.com/google/go-cmp/cmp" - "inference.networking.x-k8s.io/llm-instance-gateway/api/v1alpha1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) - -func TestUpdateDatastore_LLMServerPoolReconciler(t *testing.T) { - tests := []struct { - name string - datastore K8sDatastore - incomingServerPool *v1alpha1.LLMServerPool - want K8sDatastore - }{ - { - name: "Update to new, fresh LLMServerPool", - datastore: K8sDatastore{ - LLMServerPool: &v1alpha1.LLMServerPool{ - Spec: v1alpha1.LLMServerPoolSpec{ - ModelServerSelector: map[string]string{"app": "vllm"}, - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "test-pool", - ResourceVersion: "Old and boring", - }, - }, - }, - incomingServerPool: &v1alpha1.LLMServerPool{ - Spec: v1alpha1.LLMServerPoolSpec{ - ModelServerSelector: map[string]string{"app": "not-vllm"}, - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "test-pool", - ResourceVersion: "New and fun", - }, - }, - want: K8sDatastore{ - LLMServerPool: &v1alpha1.LLMServerPool{ - Spec: v1alpha1.LLMServerPoolSpec{ - ModelServerSelector: map[string]string{"app": "not-vllm"}, - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "test-pool", - ResourceVersion: "New and fun", - }, - }, - }, - }, - { - name: "Do not update, resource version the same", - datastore: K8sDatastore{ - LLMServerPool: &v1alpha1.LLMServerPool{ - Spec: v1alpha1.LLMServerPoolSpec{ - ModelServerSelector: map[string]string{"app": "vllm"}, - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "test-pool", - ResourceVersion: "Old and boring", - }, - }, - }, - incomingServerPool: &v1alpha1.LLMServerPool{ - Spec: v1alpha1.LLMServerPoolSpec{ - ModelServerSelector: map[string]string{"technically": "this-should-never-happen"}, - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "test-pool", - ResourceVersion: "Old and boring", - }, - }, - want: K8sDatastore{ - LLMServerPool: &v1alpha1.LLMServerPool{ - Spec: v1alpha1.LLMServerPoolSpec{ - ModelServerSelector: map[string]string{"app": "vllm"}, - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "test-pool", - ResourceVersion: "Old and boring", - }, - }, - }, - }, - } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - llmServerPoolReconciler := &LLMServerPoolReconciler{Datastore: &test.datastore} - llmServerPoolReconciler.updateDatastore(test.incomingServerPool) - - if diff := cmp.Diff(test.want.LLMServerPool, llmServerPoolReconciler.Datastore.LLMServerPool); diff != "" { - t.Errorf("Unexpected output (-want +got): %v", diff) - } - }) - } -} diff --git a/pkg/ext-proc/backend/llmserverpool_reconciler.go b/pkg/ext-proc/backend/llmserverpool_reconciler.go index d6ca67681..640b48996 100644 --- a/pkg/ext-proc/backend/llmserverpool_reconciler.go +++ b/pkg/ext-proc/backend/llmserverpool_reconciler.go @@ -33,25 +33,20 @@ func (c *LLMServerPoolReconciler) Reconcile(ctx context.Context, req ctrl.Reques if req.NamespacedName.Name != c.ServerPoolName && req.NamespacedName.Namespace != c.Namespace { return ctrl.Result{}, nil } - klog.V(1).Info("reconciling LLMServerPool", req.NamespacedName) + klog.V(2).Infof("Reconciling LLMServerPool %v", req.NamespacedName) serverPool := &v1alpha1.LLMServerPool{} if err := c.Get(ctx, req.NamespacedName, serverPool); err != nil { - klog.Error(err, "unable to get LLMServerPool") + klog.Errorf("Unable to get LLMServerPool: %v", err) return ctrl.Result{}, err } - c.updateDatastore(serverPool) + klog.V(2).Infof("Updated LLMServerPool: %+v", serverPool) + c.Datastore.setLLMServerPool(serverPool) return ctrl.Result{}, nil } -func (c *LLMServerPoolReconciler) updateDatastore(serverPool *v1alpha1.LLMServerPool) { - if c.Datastore.LLMServerPool == nil || serverPool.ObjectMeta.ResourceVersion != c.Datastore.LLMServerPool.ObjectMeta.ResourceVersion { - c.Datastore.LLMServerPool = serverPool - } -} - func (c *LLMServerPoolReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&v1alpha1.LLMServerPool{}). diff --git a/pkg/ext-proc/backend/llmservice_reconciler.go b/pkg/ext-proc/backend/llmservice_reconciler.go index 40c83bfeb..3fb230dee 100644 --- a/pkg/ext-proc/backend/llmservice_reconciler.go +++ b/pkg/ext-proc/backend/llmservice_reconciler.go @@ -22,14 +22,11 @@ type LLMServiceReconciler struct { } func (c *LLMServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - if req.Namespace != c.Namespace { - return ctrl.Result{}, nil - } - klog.V(1).Info("reconciling LLMService", req.NamespacedName) + klog.V(2).Infof("Reconciling LLMService %v", req.NamespacedName) service := &v1alpha1.LLMService{} if err := c.Get(ctx, req.NamespacedName, service); err != nil { - klog.Error(err, "unable to get LLMServerPool") + klog.Errorf("Unable to get LLMServerPool: %v", err) return ctrl.Result{}, err } @@ -46,12 +43,13 @@ func (c *LLMServiceReconciler) SetupWithManager(mgr ctrl.Manager) error { func (c *LLMServiceReconciler) updateDatastore(service *v1alpha1.LLMService) { for _, ref := range service.Spec.PoolRef { if strings.Contains(strings.ToLower(ref.Kind), strings.ToLower("LLMServerPool")) && ref.Name == c.ServerPoolName { - klog.V(2).Infof("Adding/Updating service: %v", service.Name) - c.Datastore.LLMServices.Store(service.Name, service) + klog.V(2).Infof("Adding/Updating service: %+v", service) + c.Datastore.llmServices.Store(service.Name, service) return } } - klog.V(2).Infof("Removing/Not adding service: %v", service.Name) - // If we get here. The service is not relevant to this pool, remove. - c.Datastore.LLMServices.Delete(service.Name) + klog.V(2).Infof("Removing/Not adding service: %+v", service) + // The LLMService may have changed to a different pool. Remove such services. + // Otherwise this is a noop. + c.Datastore.llmServices.Delete(service.Name) } diff --git a/pkg/ext-proc/backend/llmservice_reconciler_test.go b/pkg/ext-proc/backend/llmservice_reconciler_test.go index b5c502889..f36c43720 100644 --- a/pkg/ext-proc/backend/llmservice_reconciler_test.go +++ b/pkg/ext-proc/backend/llmservice_reconciler_test.go @@ -69,14 +69,14 @@ var ( func TestUpdateDatastore_LLMServiceReconciler(t *testing.T) { tests := []struct { name string - datastore K8sDatastore + datastore *K8sDatastore incomingService *v1alpha1.LLMService - want K8sDatastore + wantLLMService *sync.Map }{ { name: "No Services registered; valid, new service incoming.", - datastore: K8sDatastore{ - LLMServerPool: &v1alpha1.LLMServerPool{ + datastore: &K8sDatastore{ + llmServerPool: &v1alpha1.LLMServerPool{ Spec: v1alpha1.LLMServerPoolSpec{ ModelServerSelector: map[string]string{"app": "vllm"}, }, @@ -85,26 +85,15 @@ func TestUpdateDatastore_LLMServiceReconciler(t *testing.T) { ResourceVersion: "Old and boring", }, }, - LLMServices: &sync.Map{}, + llmServices: &sync.Map{}, }, incomingService: service1, - want: K8sDatastore{ - LLMServerPool: &v1alpha1.LLMServerPool{ - Spec: v1alpha1.LLMServerPoolSpec{ - ModelServerSelector: map[string]string{"app": "not-vllm"}, - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "test-pool", - ResourceVersion: "New and fun", - }, - }, - LLMServices: populateServiceMap(service1), - }, + wantLLMService: populateServiceMap(service1), }, { name: "Removing existing service.", - datastore: K8sDatastore{ - LLMServerPool: &v1alpha1.LLMServerPool{ + datastore: &K8sDatastore{ + llmServerPool: &v1alpha1.LLMServerPool{ Spec: v1alpha1.LLMServerPoolSpec{ ModelServerSelector: map[string]string{"app": "vllm"}, }, @@ -113,26 +102,15 @@ func TestUpdateDatastore_LLMServiceReconciler(t *testing.T) { ResourceVersion: "Old and boring", }, }, - LLMServices: populateServiceMap(service1), + llmServices: populateServiceMap(service1), }, incomingService: service1Modified, - want: K8sDatastore{ - LLMServerPool: &v1alpha1.LLMServerPool{ - Spec: v1alpha1.LLMServerPoolSpec{ - ModelServerSelector: map[string]string{"app": "not-vllm"}, - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "test-pool", - ResourceVersion: "New and fun", - }, - }, - LLMServices: populateServiceMap(), - }, + wantLLMService: populateServiceMap(), }, { name: "Unrelated service, do nothing.", - datastore: K8sDatastore{ - LLMServerPool: &v1alpha1.LLMServerPool{ + datastore: &K8sDatastore{ + llmServerPool: &v1alpha1.LLMServerPool{ Spec: v1alpha1.LLMServerPoolSpec{ ModelServerSelector: map[string]string{"app": "vllm"}, }, @@ -141,7 +119,7 @@ func TestUpdateDatastore_LLMServiceReconciler(t *testing.T) { ResourceVersion: "Old and boring", }, }, - LLMServices: populateServiceMap(service1), + llmServices: populateServiceMap(service1), }, incomingService: &v1alpha1.LLMService{ Spec: v1alpha1.LLMServiceSpec{ @@ -161,23 +139,12 @@ func TestUpdateDatastore_LLMServiceReconciler(t *testing.T) { Name: "unrelated-service", }, }, - want: K8sDatastore{ - LLMServerPool: &v1alpha1.LLMServerPool{ - Spec: v1alpha1.LLMServerPoolSpec{ - ModelServerSelector: map[string]string{"app": "not-vllm"}, - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "test-pool", - ResourceVersion: "New and fun", - }, - }, - LLMServices: populateServiceMap(service1), - }, + wantLLMService: populateServiceMap(service1), }, { name: "Add to existing", - datastore: K8sDatastore{ - LLMServerPool: &v1alpha1.LLMServerPool{ + datastore: &K8sDatastore{ + llmServerPool: &v1alpha1.LLMServerPool{ Spec: v1alpha1.LLMServerPoolSpec{ ModelServerSelector: map[string]string{"app": "vllm"}, }, @@ -186,29 +153,18 @@ func TestUpdateDatastore_LLMServiceReconciler(t *testing.T) { ResourceVersion: "Old and boring", }, }, - LLMServices: populateServiceMap(service1), + llmServices: populateServiceMap(service1), }, incomingService: service2, - want: K8sDatastore{ - LLMServerPool: &v1alpha1.LLMServerPool{ - Spec: v1alpha1.LLMServerPoolSpec{ - ModelServerSelector: map[string]string{"app": "not-vllm"}, - }, - ObjectMeta: metav1.ObjectMeta{ - Name: "test-pool", - ResourceVersion: "New and fun", - }, - }, - LLMServices: populateServiceMap(service1, service2), - }, + wantLLMService: populateServiceMap(service1, service2), }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - llmServiceReconciler := &LLMServiceReconciler{Datastore: &test.datastore, ServerPoolName: test.datastore.LLMServerPool.Name} + llmServiceReconciler := &LLMServiceReconciler{Datastore: test.datastore, ServerPoolName: test.datastore.llmServerPool.Name} llmServiceReconciler.updateDatastore(test.incomingService) - if ok := mapsEqual(llmServiceReconciler.Datastore.LLMServices, test.want.LLMServices); !ok { + if ok := mapsEqual(llmServiceReconciler.Datastore.llmServices, test.wantLLMService); !ok { t.Error("Maps are not equal") } }) diff --git a/pkg/ext-proc/backend/provider.go b/pkg/ext-proc/backend/provider.go index a2023b8e4..8ad491a67 100644 --- a/pkg/ext-proc/backend/provider.go +++ b/pkg/ext-proc/backend/provider.go @@ -121,13 +121,13 @@ func (p *Provider) refreshPodsOnce() error { // remove pods that don't exist any more. mergeFn := func(k, v any) bool { pod := k.(Pod) - if _, ok := p.datastore.Pods.Load(pod); !ok { + if _, ok := p.datastore.pods.Load(pod); !ok { p.podMetrics.Delete(pod) } return true } p.podMetrics.Range(mergeFn) - p.datastore.Pods.Range(addNewPods) + p.datastore.pods.Range(addNewPods) return nil } diff --git a/pkg/ext-proc/backend/provider_test.go b/pkg/ext-proc/backend/provider_test.go index b48921757..ad231f575 100644 --- a/pkg/ext-proc/backend/provider_test.go +++ b/pkg/ext-proc/backend/provider_test.go @@ -48,7 +48,7 @@ func TestProvider(t *testing.T) { { name: "Init success", datastore: &K8sDatastore{ - Pods: populateMap(pod1.Pod, pod2.Pod), + pods: populateMap(pod1.Pod, pod2.Pod), }, pmc: &FakePodMetricsClient{ Res: map[Pod]*PodMetrics{ @@ -69,7 +69,7 @@ func TestProvider(t *testing.T) { }, }, datastore: &K8sDatastore{ - Pods: populateMap(pod1.Pod, pod2.Pod), + pods: populateMap(pod1.Pod, pod2.Pod), }, want: []*PodMetrics{ pod1, diff --git a/pkg/ext-proc/handlers/request.go b/pkg/ext-proc/handlers/request.go index f1eed9e39..d7ec5e9db 100644 --- a/pkg/ext-proc/handlers/request.go +++ b/pkg/ext-proc/handlers/request.go @@ -40,13 +40,15 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces // This might be a security risk in the future where adapters not registered in the LLMService // are able to be requested by using their distinct name. modelObj := s.datastore.FetchModelData(model) - if modelObj != nil && len(modelObj.TargetModels) > 0 { + if modelObj == nil { + return nil, fmt.Errorf("error finding a model object in LLMService for input model %v", model) + } + if len(modelObj.TargetModels) > 0 { modelName = backend.RandomWeightedDraw(modelObj, 0) if modelName == "" { return nil, fmt.Errorf("error getting target model name for model %v", modelObj.Name) } } - klog.V(3).Infof("Model is null %v", modelObj == nil) llmReq := &scheduling.LLMRequest{ Model: model, ResolvedTargetModel: modelName, diff --git a/pkg/ext-proc/main.go b/pkg/ext-proc/main.go index d949e9f44..8ef29bbe4 100644 --- a/pkg/ext-proc/main.go +++ b/pkg/ext-proc/main.go @@ -7,7 +7,6 @@ import ( "net" "os" "os/signal" - "sync" "syscall" "time" @@ -64,17 +63,20 @@ func main() { ctrl.SetLogger(klog.TODO()) + // Print all flag values + flags := "Flags: " + flag.VisitAll(func(f *flag.Flag) { + flags += fmt.Sprintf("%s=%v; ", f.Name, f.Value) + }) + klog.Info(flags) + klog.Infof("Listening on %q", fmt.Sprintf(":%d", *port)) lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port)) if err != nil { klog.Fatalf("failed to listen: %v", err) } - datastore := &backend.K8sDatastore{ - LLMServerPool: &v1alpha1.LLMServerPool{}, - LLMServices: &sync.Map{}, - Pods: &sync.Map{}, - } + datastore := backend.NewK8sDataStore() mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ Scheme: scheme, diff --git a/pkg/ext-proc/test/utils.go b/pkg/ext-proc/test/utils.go index ef031e9db..22a60347a 100644 --- a/pkg/ext-proc/test/utils.go +++ b/pkg/ext-proc/test/utils.go @@ -4,7 +4,6 @@ import ( "encoding/json" "fmt" "net" - "sync" "time" "google.golang.org/grpc" @@ -27,7 +26,7 @@ func StartExtProc(port int, refreshPodsInterval, refreshMetricsInterval time.Dur pms[pod.Pod] = pod } pmc := &backend.FakePodMetricsClient{Res: pms} - pp := backend.NewProvider(pmc, &backend.K8sDatastore{Pods: populatePodDatastore(pods)}) + pp := backend.NewProvider(pmc, backend.NewK8sDataStore(backend.WithPods(pods))) if err := pp.Init(refreshPodsInterval, refreshMetricsInterval); err != nil { klog.Fatalf("failed to initialize: %v", err) } @@ -79,12 +78,3 @@ func FakePod(index int) backend.Pod { } return pod } - -func populatePodDatastore(pods []*backend.PodMetrics) *sync.Map { - returnVal := &sync.Map{} - - for _, pod := range pods { - returnVal.Store(pod.Pod, true) - } - return returnVal -}