Skip to content

Commit dd166d8

Browse files
committed
🐛 Priorityqueue: Yet another queue_depth metric fix
Inside the priorityqueues `spin` we call the metrics `add` if an item becomes ready so that the `queue_depth` metric gets incremented. To avoid doing this multiple times for the same item, we track the key in a map and remove it there when we hand the item out. If an item gets added without `RequeueAfter` that is already on the queue but with a `RequeueAfter` we also call the metrics `add` - But if we already did that in `spin` we will count the item twice.
1 parent b2dbf6e commit dd166d8

File tree

3 files changed

+21
-3
lines changed

3 files changed

+21
-3
lines changed

pkg/controller/priorityqueue/metrics.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,11 +85,11 @@ func (m *defaultQueueMetrics[T]) get(item T) {
8585
return
8686
}
8787

88+
m.depth.Dec()
89+
8890
m.mapLock.Lock()
8991
defer m.mapLock.Unlock()
9092

91-
m.depth.Dec()
92-
9393
m.processingStartTimes[item] = m.clock.Now()
9494
if startTime, exists := m.addTimes[item]; exists {
9595
m.latency.Observe(m.sinceInSeconds(startTime))

pkg/controller/priorityqueue/priorityqueue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
168168
}
169169

170170
if item.ReadyAt != nil && (readyAt == nil || readyAt.Before(*item.ReadyAt)) {
171-
if readyAt == nil {
171+
if readyAt == nil && !w.becameReady.Has(key) {
172172
w.metrics.add(key)
173173
}
174174
item.ReadyAt = readyAt

pkg/controller/priorityqueue/priorityqueue_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,24 @@ var _ = Describe("Controllerworkqueue", func() {
396396
metrics.mu.Lock()
397397
Expect(metrics.depth["test"]).To(Equal(1))
398398
})
399+
400+
It("Updates metrics correctly for an item whose requeueAfter expired that gets added again without requeueAfter", func() {
401+
q, metrics := newQueue()
402+
defer q.ShutDown()
403+
404+
q.AddWithOpts(AddOpts{After: 50 * time.Millisecond}, "foo")
405+
time.Sleep(100 * time.Millisecond)
406+
407+
Expect(q.Len()).To(Equal(1))
408+
metrics.mu.Lock()
409+
Expect(metrics.depth["test"]).To(Equal(1))
410+
metrics.mu.Unlock()
411+
412+
q.AddWithOpts(AddOpts{}, "foo")
413+
Expect(q.Len()).To(Equal(1))
414+
metrics.mu.Lock()
415+
Expect(metrics.depth["test"]).To(Equal(1))
416+
})
399417
})
400418

401419
func BenchmarkAddGetDone(b *testing.B) {

0 commit comments

Comments
 (0)