Skip to content

Commit b85f8bb

Browse files
committed
Fix: Make pod metrics StopRefreshLoop synchronous
The TestMetricsRefresh test in pod_metrics_test.go was flaky due to a race condition. The `StopRefreshLoop` method would signal the metrics refresh goroutine to stop but did not wait for its actual termination. If the test updated the mock metrics client immediately after calling `StopRefreshLoop`, the refresh goroutine could, in rare cases, perform a final metrics fetch with the new data before fully exiting. This resulted in the test asserting against unexpected metric values. This commit resolves the issue by making `StopRefreshLoop` a synchronous operation. A `sync.WaitGroup` is now used: - `wg.Add(1)` is called before the refresh goroutine starts. - `defer wg.Done()` is used within the refresh goroutine to signal completion. - `wg.Wait()` is called in `StopRefreshLoop` to block until the goroutine has fully terminated. - `stopOnce` is used to ensure the `done` channel is only closed once (for idempotency and protection against concurrent calls). This change ensures that when `StopRefreshLoop` returns, the refresh goroutine is guaranteed to have stopped, eliminating the race condition.
1 parent baf3d7d commit b85f8bb

File tree

2 files changed

+18
-10
lines changed

2 files changed

+18
-10
lines changed

pkg/epp/backend/metrics/pod_metrics.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,10 @@ type podMetrics struct {
4242
ds Datastore
4343
interval time.Duration
4444

45-
once sync.Once // ensure the StartRefreshLoop is only called once.
46-
done chan struct{}
45+
startOnce sync.Once // ensures the refresh loop goroutine is started only once
46+
stopOnce sync.Once // ensures the done channel is closed only once
47+
done chan struct{}
48+
wg sync.WaitGroup // to wait for the refresh loop to terminate
4749

4850
logger logr.Logger
4951
}
@@ -86,9 +88,11 @@ func toInternalPod(pod *corev1.Pod) *backend.Pod {
8688
// start starts a goroutine exactly once to periodically update metrics. The goroutine will be
8789
// stopped either when stop() is called, or the given ctx is cancelled.
8890
func (pm *podMetrics) startRefreshLoop(ctx context.Context) {
89-
pm.once.Do(func() {
91+
pm.startOnce.Do(func() {
92+
pm.wg.Add(1)
9093
go func() {
9194
pm.logger.V(logutil.DEFAULT).Info("Starting refresher", "pod", pm.GetPod())
95+
defer pm.wg.Done()
9296
ticker := time.NewTicker(pm.interval)
9397
defer ticker.Stop()
9498
for {
@@ -138,5 +142,8 @@ func (pm *podMetrics) refreshMetrics() error {
138142

139143
func (pm *podMetrics) StopRefreshLoop() {
140144
pm.logger.V(logutil.DEFAULT).Info("Stopping refresher", "pod", pm.GetPod())
141-
close(pm.done)
145+
pm.stopOnce.Do(func() {
146+
close(pm.done)
147+
})
148+
pm.wg.Wait()
142149
}

pkg/epp/backend/metrics/types.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,13 @@ type PodMetricsFactory struct {
4343
func (f *PodMetricsFactory) NewPodMetrics(parentCtx context.Context, in *corev1.Pod, ds Datastore) PodMetrics {
4444
pod := toInternalPod(in)
4545
pm := &podMetrics{
46-
pmc: f.pmc,
47-
ds: ds,
48-
interval: f.refreshMetricsInterval,
49-
once: sync.Once{},
50-
done: make(chan struct{}),
51-
logger: log.FromContext(parentCtx).WithValues("pod", pod.NamespacedName),
46+
pmc: f.pmc,
47+
ds: ds,
48+
interval: f.refreshMetricsInterval,
49+
startOnce: sync.Once{},
50+
stopOnce: sync.Once{},
51+
done: make(chan struct{}),
52+
logger: log.FromContext(parentCtx).WithValues("pod", pod.NamespacedName),
5253
}
5354
pm.pod.Store(pod)
5455
pm.metrics.Store(newMetrics())

0 commit comments

Comments
 (0)