-
Notifications
You must be signed in to change notification settings - Fork 76
Bug fixes: 1. NPE when model is not found 2. Port is considered 0 when LLMServerPool is not initialized #79
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
package backend | ||
|
||
import ( | ||
"fmt" | ||
"math/rand" | ||
"sync" | ||
|
||
|
@@ -9,24 +10,65 @@ import ( | |
"k8s.io/klog/v2" | ||
) | ||
|
||
func NewK8sDataStore(options ...K8sDatastoreOption) *K8sDatastore { | ||
store := &K8sDatastore{ | ||
poolMu: sync.RWMutex{}, | ||
llmServices: &sync.Map{}, | ||
pods: &sync.Map{}, | ||
} | ||
for _, opt := range options { | ||
opt(store) | ||
} | ||
return store | ||
} | ||
|
||
// The datastore is a local cache of relevant data for the given LLMServerPool (currently all pulled from k8s-api) | ||
type K8sDatastore struct { | ||
LLMServerPool *v1alpha1.LLMServerPool | ||
LLMServices *sync.Map | ||
Pods *sync.Map | ||
// poolMu is used to synchronize access to the llmServerPool. | ||
poolMu sync.RWMutex | ||
llmServerPool *v1alpha1.LLMServerPool | ||
llmServices *sync.Map | ||
pods *sync.Map | ||
} | ||
|
||
type K8sDatastoreOption func(*K8sDatastore) | ||
|
||
// WithPods can be used in tests to override the pods. | ||
func WithPods(pods []*PodMetrics) K8sDatastoreOption { | ||
return func(store *K8sDatastore) { | ||
store.pods = &sync.Map{} | ||
for _, pod := range pods { | ||
store.pods.Store(pod.Pod, true) | ||
} | ||
} | ||
} | ||
|
||
func (ds *K8sDatastore) setLLMServerPool(pool *v1alpha1.LLMServerPool) { | ||
ds.poolMu.Lock() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. have we experienced/do we expect a race condition with the serverPool? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, happens pretty often actually. If you try restarting EPP many times, you can probably hit it There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I agree that the startup race condition is annoying, and this PR resolves that in your predicate func. But I think this is unrelated. I suppose its not a big deal, and can keep us safer. Getters/Setters can just add code bloat and can complicate simple logic. Tying this into the export comment thread as well based on the same idea. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah it's common practice to hide internal fields until it's necessary to be exported, it reduces chances of bugs. The llmServerPool field is particular important here because I need to synchronize the access to it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I think thats more Java/C# style, but I won't dig too hard here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So go doesn't encourage the "GetXX" naming, so in this case I should name it "LLMServerPool" instead of "GetLLMServerPool" But Getter/Setter pattern is common in Go as well, see the "Getter" section https://go.dev/doc/effective_go |
||
defer ds.poolMu.Unlock() | ||
ds.llmServerPool = pool | ||
} | ||
|
||
func (ds *K8sDatastore) getLLMServerPool() (*v1alpha1.LLMServerPool, error) { | ||
ds.poolMu.RLock() | ||
defer ds.poolMu.RUnlock() | ||
if ds.llmServerPool == nil { | ||
return nil, fmt.Errorf("LLMServerPool hasn't been initialized yet") | ||
} | ||
return ds.llmServerPool, nil | ||
} | ||
|
||
func (ds *K8sDatastore) GetPodIPs() []string { | ||
var ips []string | ||
ds.Pods.Range(func(name, pod any) bool { | ||
ds.pods.Range(func(name, pod any) bool { | ||
ips = append(ips, pod.(*corev1.Pod).Status.PodIP) | ||
return true | ||
}) | ||
return ips | ||
} | ||
|
||
func (s *K8sDatastore) FetchModelData(modelName string) (returnModel *v1alpha1.Model) { | ||
s.LLMServices.Range(func(k, v any) bool { | ||
s.llmServices.Range(func(k, v any) bool { | ||
service := v.(*v1alpha1.LLMService) | ||
klog.V(3).Infof("Service name: %v", service.Name) | ||
for _, model := range service.Spec.Models { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,7 +9,9 @@ import ( | |
"k8s.io/client-go/tools/record" | ||
"k8s.io/klog/v2" | ||
ctrl "sigs.k8s.io/controller-runtime" | ||
"sigs.k8s.io/controller-runtime/pkg/builder" | ||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
"sigs.k8s.io/controller-runtime/pkg/predicate" | ||
) | ||
|
||
var ( | ||
|
@@ -28,18 +30,14 @@ type EndpointSliceReconciler struct { | |
} | ||
|
||
func (c *EndpointSliceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { | ||
klog.V(1).Info("reconciling EndpointSlice ", req.NamespacedName) | ||
klog.V(2).Info("Reconciling EndpointSlice ", req.NamespacedName) | ||
|
||
endpointSlice := &discoveryv1.EndpointSlice{} | ||
if err := c.Get(ctx, req.NamespacedName, endpointSlice); err != nil { | ||
klog.Error(err, "unable to get LLMServerPool") | ||
klog.Errorf("Unable to get EndpointSlice: %v", err) | ||
return ctrl.Result{}, err | ||
} | ||
|
||
if !c.ownsEndPointSlice(endpointSlice.ObjectMeta.Labels) { | ||
return ctrl.Result{}, nil | ||
} | ||
|
||
c.updateDatastore(endpointSlice) | ||
|
||
return ctrl.Result{}, nil | ||
|
@@ -50,9 +48,9 @@ func (c *EndpointSliceReconciler) updateDatastore(slice *discoveryv1.EndpointSli | |
for _, endpoint := range slice.Endpoints { | ||
klog.V(4).Infof("Zone: %v \n endpoint: %+v \n", c.Zone, endpoint) | ||
if c.validPod(endpoint) { | ||
pod := Pod{Name: *&endpoint.TargetRef.Name, Address: endpoint.Addresses[0] + ":" + fmt.Sprint(c.Datastore.LLMServerPool.Spec.TargetPort)} | ||
pod := Pod{Name: *&endpoint.TargetRef.Name, Address: endpoint.Addresses[0] + ":" + fmt.Sprint(c.Datastore.llmServerPool.Spec.TargetPort)} | ||
podMap[pod] = true | ||
c.Datastore.Pods.Store(pod, true) | ||
c.Datastore.pods.Store(pod, true) | ||
} | ||
} | ||
|
||
|
@@ -63,23 +61,37 @@ func (c *EndpointSliceReconciler) updateDatastore(slice *discoveryv1.EndpointSli | |
return false | ||
} | ||
if _, ok := podMap[pod]; !ok { | ||
c.Datastore.Pods.Delete(pod) | ||
c.Datastore.pods.Delete(pod) | ||
} | ||
return true | ||
} | ||
c.Datastore.Pods.Range(removeOldPods) | ||
c.Datastore.pods.Range(removeOldPods) | ||
} | ||
|
||
func (c *EndpointSliceReconciler) SetupWithManager(mgr ctrl.Manager) error { | ||
llmServerPoolAvailable := func(object client.Object) bool { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Important: Do not reconcile EndpointSlice until LLMServerPool is initialized |
||
_, err := c.Datastore.getLLMServerPool() | ||
if err != nil { | ||
klog.Warningf("Skipping reconciling EndpointSlice because LLMServerPool is not available yet: %v", err) | ||
} | ||
return err == nil | ||
} | ||
|
||
ownsEndPointSlice := func(object client.Object) bool { | ||
// Check if the object is an EndpointSlice | ||
endpointSlice, ok := object.(*discoveryv1.EndpointSlice) | ||
if !ok { | ||
return false | ||
} | ||
|
||
return endpointSlice.ObjectMeta.Labels[serviceOwnerLabel] == c.ServiceName | ||
} | ||
|
||
return ctrl.NewControllerManagedBy(mgr). | ||
For(&discoveryv1.EndpointSlice{}). | ||
For(&discoveryv1.EndpointSlice{}, builder.WithPredicates(predicate.NewPredicateFuncs(llmServerPoolAvailable), predicate.NewPredicateFuncs(ownsEndPointSlice))). | ||
Complete(c) | ||
} | ||
|
||
func (c *EndpointSliceReconciler) ownsEndPointSlice(labels map[string]string) bool { | ||
return labels[serviceOwnerLabel] == c.ServiceName | ||
} | ||
|
||
func (c *EndpointSliceReconciler) validPod(endpoint discoveryv1.Endpoint) bool { | ||
validZone := c.Zone == "" || c.Zone != "" && *endpoint.Zone == c.Zone | ||
return validZone && *endpoint.Conditions.Ready == true | ||
|
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think I agree with keeping these properties private. This assumes that access will only ever be from the
backend
package and I'm not sure that will be true. This package is already rather a catchall and could use some refactoring.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then you can make it public or expose a public method. That's what I am doing here. I have synchronized access to the fields so other packages won't mess up with it.
The bug is that in main.go, it sets a empty LLMServerPool which later in EndpointSlicerReconcilers uses the default port value 0. Making fields private prevents such bugs.