Skip to content

Bug fixes: 1. NPE when model is not found 2. Port is considered 0 when LLMServerPool is not initialized #79

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 47 additions & 5 deletions pkg/ext-proc/backend/datastore.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package backend

import (
"fmt"
"math/rand"
"sync"

Expand All @@ -9,24 +10,65 @@ import (
"k8s.io/klog/v2"
)

func NewK8sDataStore(options ...K8sDatastoreOption) *K8sDatastore {
store := &K8sDatastore{
poolMu: sync.RWMutex{},
llmServices: &sync.Map{},
pods: &sync.Map{},
}
for _, opt := range options {
opt(store)
}
return store
}

// The datastore is a local cache of relevant data for the given LLMServerPool (currently all pulled from k8s-api)
type K8sDatastore struct {
LLMServerPool *v1alpha1.LLMServerPool
LLMServices *sync.Map
Pods *sync.Map
// poolMu is used to synchronize access to the llmServerPool.
poolMu sync.RWMutex
llmServerPool *v1alpha1.LLMServerPool
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I agree with keeping these properties private. This assumes that access will only ever be from the backend package and I'm not sure that will be true. This package is already rather a catchall and could use some refactoring.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assumes that access will only ever be from the backend package and I'm not sure that will be true

Then you can make it public or expose a public method. That's what I am doing here. I have synchronized access to the fields so other packages won't mess up with it.

The bug is that in main.go, it sets a empty LLMServerPool which later in EndpointSlicerReconcilers uses the default port value 0. Making fields private prevents such bugs.

llmServices *sync.Map
pods *sync.Map
}

type K8sDatastoreOption func(*K8sDatastore)

// WithPods can be used in tests to override the pods.
func WithPods(pods []*PodMetrics) K8sDatastoreOption {
return func(store *K8sDatastore) {
store.pods = &sync.Map{}
for _, pod := range pods {
store.pods.Store(pod.Pod, true)
}
}
}

func (ds *K8sDatastore) setLLMServerPool(pool *v1alpha1.LLMServerPool) {
ds.poolMu.Lock()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have we experienced/do we expect a race condition with the serverPool?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, happens pretty often actually. If you try restarting EPP many times, you can probably hit it

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree that the startup race condition is annoying, and this PR resolves that in your predicate func. But I think this is unrelated. I suppose its not a big deal, and can keep us safer. Getters/Setters can just add code bloat and can complicate simple logic. Tying this into the export comment thread as well based on the same idea.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it's common practice to hide internal fields until it's necessary to be exported, it reduces chances of bugs. The llmServerPool field is particular important here because I need to synchronize the access to it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think thats more Java/C# style, but I won't dig too hard here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So go doesn't encourage the "GetXX" naming, so in this case I should name it "LLMServerPool" instead of "GetLLMServerPool" But Getter/Setter pattern is common in Go as well, see the "Getter" section https://go.dev/doc/effective_go

defer ds.poolMu.Unlock()
ds.llmServerPool = pool
}

func (ds *K8sDatastore) getLLMServerPool() (*v1alpha1.LLMServerPool, error) {
ds.poolMu.RLock()
defer ds.poolMu.RUnlock()
if ds.llmServerPool == nil {
return nil, fmt.Errorf("LLMServerPool hasn't been initialized yet")
}
return ds.llmServerPool, nil
}

func (ds *K8sDatastore) GetPodIPs() []string {
var ips []string
ds.Pods.Range(func(name, pod any) bool {
ds.pods.Range(func(name, pod any) bool {
ips = append(ips, pod.(*corev1.Pod).Status.PodIP)
return true
})
return ips
}

func (s *K8sDatastore) FetchModelData(modelName string) (returnModel *v1alpha1.Model) {
s.LLMServices.Range(func(k, v any) bool {
s.llmServices.Range(func(k, v any) bool {
service := v.(*v1alpha1.LLMService)
klog.V(3).Infof("Service name: %v", service.Name)
for _, model := range service.Spec.Models {
Expand Down
7 changes: 3 additions & 4 deletions pkg/ext-proc/backend/datastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ var ()

func TestRandomWeightedDraw(t *testing.T) {
tests := []struct {
name string
datastore K8sDatastore
model *v1alpha1.Model
want string
name string
model *v1alpha1.Model
want string
}{
{
name: "'random' distribution",
Expand Down
42 changes: 27 additions & 15 deletions pkg/ext-proc/backend/endpointslice_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

var (
Expand All @@ -28,18 +30,14 @@ type EndpointSliceReconciler struct {
}

func (c *EndpointSliceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
klog.V(1).Info("reconciling EndpointSlice ", req.NamespacedName)
klog.V(2).Info("Reconciling EndpointSlice ", req.NamespacedName)

endpointSlice := &discoveryv1.EndpointSlice{}
if err := c.Get(ctx, req.NamespacedName, endpointSlice); err != nil {
klog.Error(err, "unable to get LLMServerPool")
klog.Errorf("Unable to get EndpointSlice: %v", err)
return ctrl.Result{}, err
}

if !c.ownsEndPointSlice(endpointSlice.ObjectMeta.Labels) {
return ctrl.Result{}, nil
}

c.updateDatastore(endpointSlice)

return ctrl.Result{}, nil
Expand All @@ -50,9 +48,9 @@ func (c *EndpointSliceReconciler) updateDatastore(slice *discoveryv1.EndpointSli
for _, endpoint := range slice.Endpoints {
klog.V(4).Infof("Zone: %v \n endpoint: %+v \n", c.Zone, endpoint)
if c.validPod(endpoint) {
pod := Pod{Name: *&endpoint.TargetRef.Name, Address: endpoint.Addresses[0] + ":" + fmt.Sprint(c.Datastore.LLMServerPool.Spec.TargetPort)}
pod := Pod{Name: *&endpoint.TargetRef.Name, Address: endpoint.Addresses[0] + ":" + fmt.Sprint(c.Datastore.llmServerPool.Spec.TargetPort)}
podMap[pod] = true
c.Datastore.Pods.Store(pod, true)
c.Datastore.pods.Store(pod, true)
}
}

Expand All @@ -63,23 +61,37 @@ func (c *EndpointSliceReconciler) updateDatastore(slice *discoveryv1.EndpointSli
return false
}
if _, ok := podMap[pod]; !ok {
c.Datastore.Pods.Delete(pod)
c.Datastore.pods.Delete(pod)
}
return true
}
c.Datastore.Pods.Range(removeOldPods)
c.Datastore.pods.Range(removeOldPods)
}

func (c *EndpointSliceReconciler) SetupWithManager(mgr ctrl.Manager) error {
llmServerPoolAvailable := func(object client.Object) bool {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important: Do not reconcile EndpointSlice until LLMServerPool is initialized

_, err := c.Datastore.getLLMServerPool()
if err != nil {
klog.Warningf("Skipping reconciling EndpointSlice because LLMServerPool is not available yet: %v", err)
}
return err == nil
}

ownsEndPointSlice := func(object client.Object) bool {
// Check if the object is an EndpointSlice
endpointSlice, ok := object.(*discoveryv1.EndpointSlice)
if !ok {
return false
}

return endpointSlice.ObjectMeta.Labels[serviceOwnerLabel] == c.ServiceName
}

return ctrl.NewControllerManagedBy(mgr).
For(&discoveryv1.EndpointSlice{}).
For(&discoveryv1.EndpointSlice{}, builder.WithPredicates(predicate.NewPredicateFuncs(llmServerPoolAvailable), predicate.NewPredicateFuncs(ownsEndPointSlice))).
Complete(c)
}

func (c *EndpointSliceReconciler) ownsEndPointSlice(labels map[string]string) bool {
return labels[serviceOwnerLabel] == c.ServiceName
}

func (c *EndpointSliceReconciler) validPod(endpoint discoveryv1.Endpoint) bool {
validZone := c.Zone == "" || c.Zone != "" && *endpoint.Zone == c.Zone
return validZone && *endpoint.Conditions.Ready == true
Expand Down
40 changes: 17 additions & 23 deletions pkg/ext-proc/backend/endpointslice_reconcilier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ var (
func TestUpdateDatastore_EndpointSliceReconciler(t *testing.T) {
tests := []struct {
name string
datastore K8sDatastore
datastore *K8sDatastore
incomingSlice *discoveryv1.EndpointSlice
want K8sDatastore
wantPods *sync.Map
}{
{
name: "Add new pod",
datastore: K8sDatastore{
Pods: populateMap(basePod1, basePod2),
LLMServerPool: &v1alpha1.LLMServerPool{
datastore: &K8sDatastore{
pods: populateMap(basePod1, basePod2),
llmServerPool: &v1alpha1.LLMServerPool{
Spec: v1alpha1.LLMServerPoolSpec{
TargetPort: int32(8000),
},
Expand Down Expand Up @@ -66,15 +66,13 @@ func TestUpdateDatastore_EndpointSliceReconciler(t *testing.T) {
},
},
},
want: K8sDatastore{
Pods: populateMap(basePod1, basePod2, basePod3),
},
wantPods: populateMap(basePod1, basePod2, basePod3),
},
{
name: "New pod, but its not ready yet. Do not add.",
datastore: K8sDatastore{
Pods: populateMap(basePod1, basePod2),
LLMServerPool: &v1alpha1.LLMServerPool{
datastore: &K8sDatastore{
pods: populateMap(basePod1, basePod2),
llmServerPool: &v1alpha1.LLMServerPool{
Spec: v1alpha1.LLMServerPoolSpec{
TargetPort: int32(8000),
},
Expand Down Expand Up @@ -114,15 +112,13 @@ func TestUpdateDatastore_EndpointSliceReconciler(t *testing.T) {
},
},
},
want: K8sDatastore{
Pods: populateMap(basePod1, basePod2),
},
wantPods: populateMap(basePod1, basePod2),
},
{
name: "Existing pod not ready, new pod added, and is ready",
datastore: K8sDatastore{
Pods: populateMap(basePod1, basePod2),
LLMServerPool: &v1alpha1.LLMServerPool{
datastore: &K8sDatastore{
pods: populateMap(basePod1, basePod2),
llmServerPool: &v1alpha1.LLMServerPool{
Spec: v1alpha1.LLMServerPoolSpec{
TargetPort: int32(8000),
},
Expand Down Expand Up @@ -162,18 +158,16 @@ func TestUpdateDatastore_EndpointSliceReconciler(t *testing.T) {
},
},
},
want: K8sDatastore{
Pods: populateMap(basePod3, basePod2),
},
wantPods: populateMap(basePod3, basePod2),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
endpointSliceReconciler := &EndpointSliceReconciler{Datastore: &test.datastore, Zone: ""}
endpointSliceReconciler := &EndpointSliceReconciler{Datastore: test.datastore, Zone: ""}
endpointSliceReconciler.updateDatastore(test.incomingSlice)

if mapsEqual(endpointSliceReconciler.Datastore.Pods, test.want.Pods) {
t.Errorf("Unexpected output pod mismatch. \n Got %v \n Want: %v \n", endpointSliceReconciler.Datastore.Pods, test.want.Pods)
if mapsEqual(endpointSliceReconciler.Datastore.pods, test.wantPods) {
t.Errorf("Unexpected output pod mismatch. \n Got %v \n Want: %v \n", endpointSliceReconciler.Datastore.pods, test.wantPods)
}
})
}
Expand Down
97 changes: 0 additions & 97 deletions pkg/ext-proc/backend/llmlserverpool_reconciler_test.go

This file was deleted.

13 changes: 4 additions & 9 deletions pkg/ext-proc/backend/llmserverpool_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,25 +33,20 @@ func (c *LLMServerPoolReconciler) Reconcile(ctx context.Context, req ctrl.Reques
if req.NamespacedName.Name != c.ServerPoolName && req.NamespacedName.Namespace != c.Namespace {
return ctrl.Result{}, nil
}
klog.V(1).Info("reconciling LLMServerPool", req.NamespacedName)
klog.V(2).Infof("Reconciling LLMServerPool %v", req.NamespacedName)

serverPool := &v1alpha1.LLMServerPool{}
if err := c.Get(ctx, req.NamespacedName, serverPool); err != nil {
klog.Error(err, "unable to get LLMServerPool")
klog.Errorf("Unable to get LLMServerPool: %v", err)
return ctrl.Result{}, err
}

c.updateDatastore(serverPool)
klog.V(2).Infof("Updated LLMServerPool: %+v", serverPool)
c.Datastore.setLLMServerPool(serverPool)

return ctrl.Result{}, nil
}

func (c *LLMServerPoolReconciler) updateDatastore(serverPool *v1alpha1.LLMServerPool) {
if c.Datastore.LLMServerPool == nil || serverPool.ObjectMeta.ResourceVersion != c.Datastore.LLMServerPool.ObjectMeta.ResourceVersion {
c.Datastore.LLMServerPool = serverPool
}
}

func (c *LLMServerPoolReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.LLMServerPool{}).
Expand Down
Loading