Skip to content

Commit e51b25f

Browse files
authored
Merge pull request #700 from rabbitmq/collect-consumed-message-one-by-one
Collect consumed metric message by message
2 parents deedc79 + 59fe87c commit e51b25f

File tree

1 file changed

+2
-4
lines changed

1 file changed

+2
-4
lines changed

src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,6 @@ static int handleDeliver(
451451
}
452452

453453
metricsCollector.chunk(numEntries);
454-
long messagesRead = 0;
455454
MutableBoolean messageIgnored = new MutableBoolean(false);
456455

457456
while (numRecords != 0) {
@@ -482,7 +481,7 @@ static int handleDeliver(
482481
subscriptionId, offset, chunkTimestamp, committedOffset, chunkContext);
483482
messageIgnored.set(false);
484483
} else {
485-
messagesRead++;
484+
metricsCollector.consume(1);
486485
}
487486
numRecords--;
488487
offset++; // works even for unsigned long
@@ -551,7 +550,7 @@ static int handleDeliver(
551550
subscriptionId, offset, chunkTimestamp, committedOffset, chunkContext);
552551
messageIgnored.set(false);
553552
} else {
554-
messagesRead++;
553+
metricsCollector.consume(1);
555554
}
556555
numRecordsInBatch--;
557556
offset++; // works even for unsigned long
@@ -564,7 +563,6 @@ static int handleDeliver(
564563
}
565564
}
566565
}
567-
metricsCollector.consume(messagesRead);
568566
return read;
569567
}
570568

0 commit comments

Comments
 (0)