From 59fe87cdb17cd97c8d6a7eb485fe818eda12a004 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Fri, 31 Jan 2025 09:09:06 +0100 Subject: [PATCH] Collect consumed metric message by message Instead of collecting after the whole chunk is dispatched. This makes the metric more accurate when message processing takes some time. --- .../java/com/rabbitmq/stream/impl/ServerFrameHandler.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java b/src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java index c8410bbd0f..93050de1b6 100644 --- a/src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java +++ b/src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java @@ -451,7 +451,6 @@ static int handleDeliver( } metricsCollector.chunk(numEntries); - long messagesRead = 0; MutableBoolean messageIgnored = new MutableBoolean(false); while (numRecords != 0) { @@ -482,7 +481,7 @@ static int handleDeliver( subscriptionId, offset, chunkTimestamp, committedOffset, chunkContext); messageIgnored.set(false); } else { - messagesRead++; + metricsCollector.consume(1); } numRecords--; offset++; // works even for unsigned long @@ -551,7 +550,7 @@ static int handleDeliver( subscriptionId, offset, chunkTimestamp, committedOffset, chunkContext); messageIgnored.set(false); } else { - messagesRead++; + metricsCollector.consume(1); } numRecordsInBatch--; offset++; // works even for unsigned long @@ -564,7 +563,6 @@ static int handleDeliver( } } } - metricsCollector.consume(messagesRead); return read; }