Skip to content

Commit 98b9371

Browse files
committed
Address comments
1 parent 0241829 commit 98b9371

File tree

11 files changed

+190
-242
lines changed

11 files changed

+190
-242
lines changed

pkg/epp/backend/metrics/fake.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"fmt"
2222
"sync"
2323

24+
corev1 "k8s.io/api/core/v1"
2425
"k8s.io/apimachinery/pkg/types"
2526
"sigs.k8s.io/controller-runtime/pkg/log"
2627
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2"
@@ -39,8 +40,8 @@ func (fpm *FakePodMetrics) GetPod() *Pod {
3940
func (fpm *FakePodMetrics) GetMetrics() *Metrics {
4041
return fpm.Metrics
4142
}
42-
func (fpm *FakePodMetrics) UpdatePod(pod *Pod) {
43-
fpm.Pod = pod
43+
func (fpm *FakePodMetrics) UpdatePod(pod *corev1.Pod) {
44+
fpm.Pod = toInternalPod(pod)
4445
}
4546
func (fpm *FakePodMetrics) StopRefreshLoop() {} // noop
4647

pkg/epp/backend/metrics/logger.go

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@ import (
2020
"context"
2121
"time"
2222

23+
"github.com/go-logr/logr"
2324
"sigs.k8s.io/controller-runtime/pkg/log"
25+
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha2"
26+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
2427
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
2528
)
2629

@@ -31,12 +34,30 @@ const (
3134
)
3235

3336
type Datastore interface {
37+
PoolGet() (*v1alpha2.InferencePool, error)
38+
// PodMetrics operations
39+
// PodGetAll returns all pods and metrics, including fresh and stale.
40+
PodGetAll() []PodMetrics
3441
PodList(func(PodMetrics) bool) []PodMetrics
3542
}
3643

37-
func PrintMetricsForDebugging(ctx context.Context, datastore Datastore) {
44+
func LogMetricsPeriodically(ctx context.Context, datastore Datastore, refreshPrometheusMetricsInterval time.Duration) {
3845
logger := log.FromContext(ctx)
3946

47+
// Periodically flush prometheus metrics for inference pool
48+
go func() {
49+
for {
50+
select {
51+
case <-ctx.Done():
52+
logger.V(logutil.DEFAULT).Info("Shutting down prometheus metrics thread")
53+
return
54+
default:
55+
time.Sleep(refreshPrometheusMetricsInterval)
56+
flushPrometheusMetricsOnce(logger, datastore)
57+
}
58+
}
59+
}()
60+
4061
// Periodically print out the pods and metrics for DEBUGGING.
4162
if logger := logger.V(logutil.DEBUG); logger.Enabled() {
4263
go func() {
@@ -59,3 +80,30 @@ func PrintMetricsForDebugging(ctx context.Context, datastore Datastore) {
5980
}()
6081
}
6182
}
83+
84+
func flushPrometheusMetricsOnce(logger logr.Logger, datastore Datastore) {
85+
pool, err := datastore.PoolGet()
86+
if err != nil {
87+
// No inference pool or not initialize.
88+
logger.V(logutil.VERBOSE).Info("pool is not initialized, skipping flushing metrics")
89+
return
90+
}
91+
92+
var kvCacheTotal float64
93+
var queueTotal int
94+
95+
podMetrics := datastore.PodGetAll()
96+
logger.V(logutil.VERBOSE).Info("Flushing Prometheus Metrics", "ReadyPods", len(podMetrics))
97+
if len(podMetrics) == 0 {
98+
return
99+
}
100+
101+
for _, pod := range podMetrics {
102+
kvCacheTotal += pod.GetMetrics().KVCacheUsagePercent
103+
queueTotal += pod.GetMetrics().WaitingQueueSize
104+
}
105+
106+
podTotalCount := len(podMetrics)
107+
metrics.RecordInferencePoolAvgKVCache(pool.Name, kvCacheTotal/float64(podTotalCount))
108+
metrics.RecordInferencePoolAvgQueueSize(pool.Name, float64(queueTotal/podTotalCount))
109+
}

pkg/epp/backend/metrics/pod_metrics.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import (
2424
"unsafe"
2525

2626
"github.com/go-logr/logr"
27+
corev1 "k8s.io/api/core/v1"
28+
"k8s.io/apimachinery/pkg/types"
2729

2830
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
2931
)
@@ -58,8 +60,18 @@ func (pm *podMetrics) GetMetrics() *Metrics {
5860
return (*Metrics)(atomic.LoadPointer(&pm.metrics))
5961
}
6062

61-
func (pm *podMetrics) UpdatePod(pod *Pod) {
62-
atomic.StorePointer(&pm.pod, unsafe.Pointer(pod))
63+
func (pm *podMetrics) UpdatePod(in *corev1.Pod) {
64+
atomic.StorePointer(&pm.pod, unsafe.Pointer(toInternalPod(in)))
65+
}
66+
67+
func toInternalPod(in *corev1.Pod) *Pod {
68+
return &Pod{
69+
NamespacedName: types.NamespacedName{
70+
Name: in.Name,
71+
Namespace: in.Namespace,
72+
},
73+
Address: in.Status.PodIP,
74+
}
6375
}
6476

6577
// start starts a goroutine exactly once to periodically update metrics. The goroutine will be

pkg/epp/backend/metrics/pod_metrics_test.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,19 @@ import (
2323
"github.com/google/go-cmp/cmp"
2424
"github.com/google/go-cmp/cmp/cmpopts"
2525
"github.com/stretchr/testify/assert"
26+
corev1 "k8s.io/api/core/v1"
27+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2628
"k8s.io/apimachinery/pkg/types"
2729
)
2830

2931
var (
30-
pod1 = &Pod{
31-
NamespacedName: types.NamespacedName{
32-
Name: "pod1",
32+
pod1 = &corev1.Pod{
33+
ObjectMeta: metav1.ObjectMeta{
34+
Name: "pod1",
35+
Namespace: "default",
3336
},
3437
}
35-
metrics = &Metrics{
38+
initial = &Metrics{
3639
WaitingQueueSize: 0,
3740
KVCacheUsagePercent: 0.2,
3841
MaxActiveModels: 2,
@@ -59,22 +62,19 @@ func TestMetricsRefresh(t *testing.T) {
5962
// The refresher is initialized with empty metrics.
6063
pm := pmf.NewPodMetrics(ctx, pod1, 8000)
6164

65+
namespacedName := types.NamespacedName{Name: pod1.Name, Namespace: pod1.Namespace}
6266
// Use SetRes to simulate an update of metrics from the pod.
6367
// Verify that the metrics are updated.
64-
pmc.SetRes(map[types.NamespacedName]*Metrics{
65-
pod1.NamespacedName: metrics,
66-
})
68+
pmc.SetRes(map[types.NamespacedName]*Metrics{namespacedName: initial})
6769
condition := func(collect *assert.CollectT) {
68-
assert.True(collect, cmp.Equal(pm.GetMetrics(), metrics, cmpopts.IgnoreFields(Metrics{}, "UpdateTime")))
70+
assert.True(collect, cmp.Equal(pm.GetMetrics(), initial, cmpopts.IgnoreFields(Metrics{}, "UpdateTime")))
6971
}
7072
assert.EventuallyWithT(t, condition, time.Second, time.Millisecond)
7173

7274
// Stop the loop, and simulate metric update again, this time the PodMetrics won't get the
7375
// new update.
7476
pm.StopRefreshLoop()
75-
pmc.SetRes(map[types.NamespacedName]*Metrics{
76-
pod1.NamespacedName: updated,
77-
})
77+
pmc.SetRes(map[types.NamespacedName]*Metrics{namespacedName: updated})
7878
// Still expect the same condition (no metrics update).
7979
assert.EventuallyWithT(t, condition, time.Second, time.Millisecond)
8080
}

pkg/epp/backend/metrics/types.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"time"
2525
"unsafe"
2626

27+
corev1 "k8s.io/api/core/v1"
2728
"k8s.io/apimachinery/pkg/types"
2829
"sigs.k8s.io/controller-runtime/pkg/log"
2930
)
@@ -40,9 +41,9 @@ type PodMetricsFactory struct {
4041
refreshMetricsInterval time.Duration
4142
}
4243

43-
func (f *PodMetricsFactory) NewPodMetrics(parentCtx context.Context, pod *Pod, port int32) PodMetrics {
44+
func (f *PodMetricsFactory) NewPodMetrics(parentCtx context.Context, in *corev1.Pod, port int32) PodMetrics {
4445
pm := &podMetrics{
45-
pod: unsafe.Pointer(pod),
46+
pod: unsafe.Pointer(toInternalPod(in)),
4647
metrics: unsafe.Pointer(newMetrics()),
4748
pmc: f.pmc,
4849
targetPort: port,
@@ -59,7 +60,7 @@ func (f *PodMetricsFactory) NewPodMetrics(parentCtx context.Context, pod *Pod, p
5960
type PodMetrics interface {
6061
GetPod() *Pod
6162
GetMetrics() *Metrics
62-
UpdatePod(*Pod)
63+
UpdatePod(*corev1.Pod)
6364
StopRefreshLoop()
6465
}
6566

0 commit comments

Comments
 (0)