Skip to content

Commit baf6c37

Browse files
committed
fix metric scrape port not updated when inference pool target port updated
Signed-off-by: Kuromesi <[email protected]>
1 parent 7ed54a4 commit baf6c37

File tree

7 files changed

+58
-21
lines changed

7 files changed

+58
-21
lines changed

pkg/epp/backend/provider.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ func (p *Provider) Init(ctx context.Context, refreshMetricsInterval, refreshProm
9494
return
9595
default:
9696
time.Sleep(5 * time.Second)
97-
logger.Info("Current Pods and metrics gathered", "metrics", p.datastore.PodGetAll())
97+
logger.Info("Current Pods and metrics gathered", "metrics", p.datastore.PodList())
9898
}
9999
}
100100
}()
@@ -162,7 +162,7 @@ func (p *Provider) flushPrometheusMetricsOnce(logger logr.Logger) {
162162
var kvCacheTotal float64
163163
var queueTotal int
164164

165-
podMetrics := p.datastore.PodGetAll()
165+
podMetrics := p.datastore.PodList()
166166
if len(podMetrics) == 0 {
167167
return
168168
}

pkg/epp/backend/provider_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ func TestProvider(t *testing.T) {
116116
{
117117
Pod: datastore.Pod{NamespacedName: pod2.NamespacedName},
118118
Metrics: datastore.Metrics{
119+
ActiveModels: map[string]int{},
119120
WaitingQueueSize: 0,
120121
KVCacheUsagePercent: 0,
121122
MaxActiveModels: 0,
@@ -133,7 +134,7 @@ func TestProvider(t *testing.T) {
133134
defer cancel()
134135
_ = p.Init(ctx, time.Millisecond, time.Millisecond)
135136
assert.EventuallyWithT(t, func(t *assert.CollectT) {
136-
metrics := ds.PodGetAll()
137+
metrics := ds.PodList()
137138
diff := cmp.Diff(test.want, metrics, cmpopts.SortSlices(func(a, b *datastore.PodMetrics) bool {
138139
return a.String() < b.String()
139140
}))

pkg/epp/controller/inferencepool_reconciler.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,13 @@ func (c *InferencePoolReconciler) updateDatastore(ctx context.Context, newPool *
8585
// the ones that may have existed already to the store.
8686
c.Datastore.PodResyncAll(ctx, c.Client)
8787
}
88+
if oldPool != nil && newPool.Spec.TargetPortNumber != oldPool.Spec.TargetPortNumber {
89+
logger.V(logutil.DEFAULT).Info("Updating scrape port of endpoints", "targetPortNumber", newPool.Spec.TargetPortNumber)
90+
c.Datastore.PodUpdate(func(pod *datastore.PodMetrics) *datastore.PodMetrics {
91+
pod.ScrapePort = newPool.Spec.TargetPortNumber
92+
return pod
93+
})
94+
}
8895
}
8996

9097
func (c *InferencePoolReconciler) SetupWithManager(mgr ctrl.Manager) error {

pkg/epp/controller/inferencepool_reconciler_test.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,13 @@ func TestInferencePoolReconciler(t *testing.T) {
134134
t.Errorf("Unexpected diff (+got/-want): %s", diff)
135135
}
136136

137+
pods := datastore.PodList()
138+
for _, pm := range pods {
139+
if pm.ScrapePort != newPool1.Spec.TargetPortNumber {
140+
t.Errorf("Unexpected pod scrape port: %d", pm.ScrapePort)
141+
}
142+
}
143+
137144
// Step 4: delete the pool to trigger a datastore clear
138145
if err := fakeClient.Get(ctx, req.NamespacedName, newPool1); err != nil {
139146
t.Errorf("Unexpected pool get error: %v", err)
@@ -166,7 +173,7 @@ func diffStore(datastore datastore.Datastore, params diffStoreParams) string {
166173
params.wantPods = []string{}
167174
}
168175
gotPods := []string{}
169-
for _, pm := range datastore.PodGetAll() {
176+
for _, pm := range datastore.PodList() {
170177
gotPods = append(gotPods, pm.NamespacedName.Name)
171178
}
172179
if diff := cmp.Diff(params.wantPods, gotPods, cmpopts.SortSlices(func(a, b string) bool { return a < b })); diff != "" {

pkg/epp/datastore/datastore.go

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -62,14 +62,19 @@ type Datastore interface {
6262
PodGet(namespacedName types.NamespacedName) (*PodMetrics, bool)
6363
PodDelete(namespacedName types.NamespacedName)
6464
PodResyncAll(ctx context.Context, ctrlClient client.Client)
65-
PodGetAll() []*PodMetrics
65+
PodList(predicates ...PodListPredicate) []*PodMetrics
6666
PodDeleteAll() // This is only for testing.
6767
PodRange(f func(key, value any) bool)
68+
PodUpdate(f PodUpdateFunc, predicates ...PodListPredicate)
6869

6970
// Clears the store state, happens when the pool gets deleted.
7071
Clear()
7172
}
7273

74+
type PodUpdateFunc func(pod *PodMetrics) *PodMetrics
75+
76+
type PodListPredicate func(pod *PodMetrics) bool
77+
7378
func NewDatastore() Datastore {
7479
store := &datastore{
7580
poolAndModelsMu: sync.RWMutex{},
@@ -231,8 +236,9 @@ func (ds *datastore) ModelGetAll() []*v1alpha2.InferenceModel {
231236
// /// Pods/endpoints APIs ///
232237
func (ds *datastore) PodUpdateMetricsIfExist(namespacedName types.NamespacedName, m *Metrics) bool {
233238
if val, ok := ds.pods.Load(namespacedName); ok {
234-
existing := val.(*PodMetrics)
239+
existing := val.(*PodMetrics).Clone()
235240
existing.Metrics = *m
241+
ds.pods.Store(namespacedName, existing)
236242
return true
237243
}
238244
return false
@@ -241,23 +247,35 @@ func (ds *datastore) PodUpdateMetricsIfExist(namespacedName types.NamespacedName
241247
func (ds *datastore) PodGet(namespacedName types.NamespacedName) (*PodMetrics, bool) {
242248
val, ok := ds.pods.Load(namespacedName)
243249
if ok {
244-
return val.(*PodMetrics), true
250+
return val.(*PodMetrics).Clone(), true
245251
}
246252
return nil, false
247253
}
248254

249-
func (ds *datastore) PodGetAll() []*PodMetrics {
255+
// PodList returns a list of PodMetrics that match the given predicates.
256+
// If no predicates are provided, all PodMetrics will be returned.
257+
func (ds *datastore) PodList(predicates ...PodListPredicate) []*PodMetrics {
250258
res := []*PodMetrics{}
251-
fn := func(k, v any) bool {
252-
res = append(res, v.(*PodMetrics))
259+
fn := func(_, v any) bool {
260+
pod := v.(*PodMetrics)
261+
for _, p := range predicates {
262+
if !p(pod) {
263+
continue
264+
}
265+
}
266+
res = append(res, pod)
253267
return true
254268
}
255-
ds.pods.Range(fn)
269+
ds.PodRange(fn)
256270
return res
257271
}
258272

259273
func (ds *datastore) PodRange(f func(key, value any) bool) {
260-
ds.pods.Range(f)
274+
withClone := func(key, value any) bool {
275+
pod := value.(*PodMetrics)
276+
return f(key, pod.Clone())
277+
}
278+
ds.pods.Range(withClone)
261279
}
262280

263281
func (ds *datastore) PodDelete(namespacedName types.NamespacedName) {
@@ -280,15 +298,18 @@ func (ds *datastore) PodUpdateOrAddIfNotExist(pod *corev1.Pod) bool {
280298
ActiveModels: make(map[string]int),
281299
},
282300
}
283-
existing, ok := ds.pods.Load(new.NamespacedName)
284-
if !ok {
285-
ds.pods.Store(new.NamespacedName, new)
286-
return true
287-
}
301+
_, ok := ds.pods.Load(new.NamespacedName)
302+
ds.pods.Store(new.NamespacedName, new)
303+
return ok
304+
}
288305

289-
// Update pod properties if anything changed.
290-
existing.(*PodMetrics).Pod = new.Pod
291-
return false
306+
func (ds *datastore) PodUpdate(f PodUpdateFunc, predicates ...PodListPredicate) {
307+
pods := ds.PodList(predicates...)
308+
for _, pod := range pods {
309+
if new := f(pod); new != nil {
310+
ds.pods.Store(new.NamespacedName, new)
311+
}
312+
}
292313
}
293314

294315
func (ds *datastore) PodResyncAll(ctx context.Context, ctrlClient client.Client) {

pkg/epp/datastore/types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ func (pm *PodMetrics) Clone() *PodMetrics {
6666
},
6767
Metrics: Metrics{
6868
ActiveModels: cm,
69+
MaxActiveModels: pm.MaxActiveModels,
6970
RunningQueueSize: pm.RunningQueueSize,
7071
WaitingQueueSize: pm.WaitingQueueSize,
7172
KVCacheUsagePercent: pm.KVCacheUsagePercent,

pkg/epp/scheduling/scheduler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ type Scheduler struct {
124124
// Schedule finds the target pod based on metrics and the requested lora adapter.
125125
func (s *Scheduler) Schedule(ctx context.Context, req *LLMRequest) (targetPod datastore.PodMetrics, err error) {
126126
logger := log.FromContext(ctx).WithValues("request", req)
127-
podMetrics := s.datastore.PodGetAll()
127+
podMetrics := s.datastore.PodList()
128128
logger.V(logutil.VERBOSE).Info("Scheduling a request", "metrics", podMetrics)
129129
pods, err := s.filter.Filter(logger, req, podMetrics)
130130
if err != nil || len(pods) == 0 {

0 commit comments

Comments
 (0)