Skip to content

Commit 59fe87c

Browse files
committed
Collect consumed metric message by message
Instead of collecting after the whole chunk is dispatched. This makes the metric more accurate when message processing takes some time.
1 parent deedc79 commit 59fe87c

File tree

1 file changed

+2
-4
lines changed

1 file changed

+2
-4
lines changed

src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,6 @@ static int handleDeliver(
451451
}
452452

453453
metricsCollector.chunk(numEntries);
454-
long messagesRead = 0;
455454
MutableBoolean messageIgnored = new MutableBoolean(false);
456455

457456
while (numRecords != 0) {
@@ -482,7 +481,7 @@ static int handleDeliver(
482481
subscriptionId, offset, chunkTimestamp, committedOffset, chunkContext);
483482
messageIgnored.set(false);
484483
} else {
485-
messagesRead++;
484+
metricsCollector.consume(1);
486485
}
487486
numRecords--;
488487
offset++; // works even for unsigned long
@@ -551,7 +550,7 @@ static int handleDeliver(
551550
subscriptionId, offset, chunkTimestamp, committedOffset, chunkContext);
552551
messageIgnored.set(false);
553552
} else {
554-
messagesRead++;
553+
metricsCollector.consume(1);
555554
}
556555
numRecordsInBatch--;
557556
offset++; // works even for unsigned long
@@ -564,7 +563,6 @@ static int handleDeliver(
564563
}
565564
}
566565
}
567-
metricsCollector.consume(messagesRead);
568566
return read;
569567
}
570568

0 commit comments

Comments
 (0)