Skip to content

Commit 92b944d

Browse files
committed
Distinguish nack metrics by requeue flag
1 parent c493ad6 commit 92b944d

8 files changed

+41
-17
lines changed

src/main/java/com/rabbitmq/client/MetricsCollector.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,9 @@ default void basicPublishUnrouted(Channel channel) {
5858

5959
void basicAck(Channel channel, long deliveryTag, boolean multiple);
6060

61-
void basicNack(Channel channel, long deliveryTag);
61+
void basicNack(Channel channel, long deliveryTag, boolean requeue);
6262

63-
void basicReject(Channel channel, long deliveryTag);
63+
void basicReject(Channel channel, long deliveryTag, boolean requeue);
6464

6565
void basicConsume(Channel channel, String consumerTag, boolean autoAck);
6666

src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,12 @@ public void basicAck(Channel channel, long deliveryTag, boolean multiple) {
4646
}
4747

4848
@Override
49-
public void basicNack(Channel channel, long deliveryTag) {
49+
public void basicNack(Channel channel, long deliveryTag, boolean requeue) {
5050

5151
}
5252

5353
@Override
54-
public void basicReject(Channel channel, long deliveryTag) {
54+
public void basicReject(Channel channel, long deliveryTag, boolean requeue) {
5555

5656
}
5757

src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public abstract class AbstractMetricsCollector implements MetricsCollector {
4343

4444
private final Runnable markAcknowledgedMessageAction = () -> markAcknowledgedMessage();
4545

46-
private final Runnable markRejectedMessageAction = () -> markRejectedMessage();
46+
private final Function<Boolean, Runnable> markRejectedMessageAction = requeue -> () -> markRejectedMessage(requeue);
4747

4848
private final Runnable markMessagePublishAcknowledgedAction = () -> markMessagePublishAcknowledged();
4949

@@ -236,18 +236,18 @@ public void basicAck(Channel channel, long deliveryTag, boolean multiple) {
236236
}
237237

238238
@Override
239-
public void basicNack(Channel channel, long deliveryTag) {
239+
public void basicNack(Channel channel, long deliveryTag, boolean requeue) {
240240
try {
241-
updateChannelStateAfterAckReject(channel, deliveryTag, true, GET_UNACKED_DTAGS, markRejectedMessageAction);
241+
updateChannelStateAfterAckReject(channel, deliveryTag, true, GET_UNACKED_DTAGS, markRejectedMessageAction.apply(requeue));
242242
} catch(Exception e) {
243243
LOGGER.info("Error while computing metrics in basicNack: " + e.getMessage());
244244
}
245245
}
246246

247247
@Override
248-
public void basicReject(Channel channel, long deliveryTag) {
248+
public void basicReject(Channel channel, long deliveryTag, boolean requeue) {
249249
try {
250-
updateChannelStateAfterAckReject(channel, deliveryTag, false, GET_UNACKED_DTAGS, markRejectedMessageAction);
250+
updateChannelStateAfterAckReject(channel, deliveryTag, false, GET_UNACKED_DTAGS, markRejectedMessageAction.apply(requeue));
251251
} catch(Exception e) {
252252
LOGGER.info("Error while computing metrics in basicReject: " + e.getMessage());
253253
}
@@ -409,7 +409,7 @@ private ChannelState(Channel channel) {
409409
/**
410410
* Marks the event of a rejected message.
411411
*/
412-
protected abstract void markRejectedMessage();
412+
protected abstract void markRejectedMessage(boolean requeue);
413413

414414
/**
415415
* Marks the event of a message publishing acknowledgement.

src/main/java/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1213,7 +1213,7 @@ public void basicNack(long deliveryTag, boolean multiple, boolean requeue)
12131213
throws IOException
12141214
{
12151215
transmit(new Basic.Nack(deliveryTag, multiple, requeue));
1216-
metricsCollector.basicNack(this, deliveryTag);
1216+
metricsCollector.basicNack(this, deliveryTag, requeue);
12171217
}
12181218

12191219
/** Public API - {@inheritDoc} */
@@ -1222,7 +1222,7 @@ public void basicReject(long deliveryTag, boolean requeue)
12221222
throws IOException
12231223
{
12241224
transmit(new Basic.Reject(deliveryTag, requeue));
1225-
metricsCollector.basicReject(this, deliveryTag);
1225+
metricsCollector.basicReject(this, deliveryTag, requeue);
12261226
}
12271227

12281228
/** Public API - {@inheritDoc} */

src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ public class MicrometerMetricsCollector extends AbstractMetricsCollector {
6363

6464
private final Counter rejectedMessages;
6565

66+
private final Counter requeuedPublishedMessages;
67+
6668
public MicrometerMetricsCollector(MeterRegistry registry) {
6769
this(registry, "rabbitmq");
6870
}
@@ -90,6 +92,7 @@ public MicrometerMetricsCollector(Function<Metrics, Object> metricsCreator) {
9092
this.ackedPublishedMessages = (Counter) metricsCreator.apply(ACKED_PUBLISHED_MESSAGES);
9193
this.nackedPublishedMessages = (Counter) metricsCreator.apply(NACKED_PUBLISHED_MESSAGES);
9294
this.unroutedPublishedMessages = (Counter) metricsCreator.apply(UNROUTED_PUBLISHED_MESSAGES);
95+
this.requeuedPublishedMessages = (Counter) metricsCreator.apply(REQUEUED_PUBLISHED_MESSAGES);
9396
}
9497

9598
@Override
@@ -133,7 +136,10 @@ protected void markAcknowledgedMessage() {
133136
}
134137

135138
@Override
136-
protected void markRejectedMessage() {
139+
protected void markRejectedMessage(boolean requeue) {
140+
if (requeue) {
141+
requeuedPublishedMessages.increment();
142+
}
137143
rejectedMessages.increment();
138144
}
139145

@@ -252,6 +258,12 @@ Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
252258
Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
253259
return registry.counter(prefix + ".unrouted_published", tags);
254260
}
261+
},
262+
REQUEUED_PUBLISHED_MESSAGES {
263+
@Override
264+
Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
265+
return registry.counter(prefix + ".requeued_published", tags);
266+
}
255267
};
256268

257269
abstract Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags);

src/main/java/com/rabbitmq/client/impl/OpenTelemetryMetricsCollector.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public class OpenTelemetryMetricsCollector extends AbstractMetricsCollector {
4646
private final LongCounter ackedPublishedMessagesCounter;
4747
private final LongCounter nackedPublishedMessagesCounter;
4848
private final LongCounter unroutedPublishedMessagesCounter;
49+
private final LongCounter requeuedPublishedMessagesCounter;
4950

5051
public OpenTelemetryMetricsCollector(OpenTelemetry openTelemetry) {
5152
this(openTelemetry, "rabbitmq");
@@ -123,6 +124,12 @@ public OpenTelemetryMetricsCollector(final OpenTelemetry openTelemetry, final St
123124
.setUnit("{messages}")
124125
.setDescription("The number of un-routed published messages to the RabbitMQ server")
125126
.build();
127+
128+
// requeuedPublishedMessages
129+
this.requeuedPublishedMessagesCounter = meter.counterBuilder(prefix + ".requeued_published")
130+
.setUnit("{messages}")
131+
.setDescription("The number of re-queued published messages to the RabbitMQ server")
132+
.build();
126133
}
127134

128135
@Override
@@ -166,7 +173,10 @@ protected void markAcknowledgedMessage() {
166173
}
167174

168175
@Override
169-
protected void markRejectedMessage() {
176+
protected void markRejectedMessage(boolean requeue) {
177+
if (requeue) {
178+
requeuedPublishedMessagesCounter.add(1L, attributes);
179+
}
170180
rejectedMessagesCounter.add(1L, attributes);
171181
}
172182

src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public class StandardMetricsCollector extends AbstractMetricsCollector {
4545
private final Meter publishAcknowledgedMessages;
4646
private final Meter publishNacknowledgedMessages;
4747
private final Meter publishUnroutedMessages;
48+
private final Meter requeuedPublishedMessages;
4849

4950

5051
public StandardMetricsCollector(MetricRegistry registry, String metricsPrefix) {
@@ -59,6 +60,7 @@ public StandardMetricsCollector(MetricRegistry registry, String metricsPrefix) {
5960
this.consumedMessages = registry.meter(metricsPrefix+".consumed");
6061
this.acknowledgedMessages = registry.meter(metricsPrefix+".acknowledged");
6162
this.rejectedMessages = registry.meter(metricsPrefix+".rejected");
63+
this.requeuedPublishedMessages = registry.meter(metricsPrefix+".requeued_published");
6264
}
6365

6466
public StandardMetricsCollector() {
@@ -110,7 +112,7 @@ protected void markAcknowledgedMessage() {
110112
}
111113

112114
@Override
113-
protected void markRejectedMessage() {
115+
protected void markRejectedMessage(boolean requeue) {
114116
rejectedMessages.mark();
115117
}
116118

src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareChannelN.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ public void basicNack(long deliveryTag, boolean multiple, boolean requeue) throw
126126
return;
127127
}
128128
transmit(new Basic.Nack(realTag, multiple, requeue));
129-
metricsCollector.basicNack(this, deliveryTag);
129+
metricsCollector.basicNack(this, deliveryTag, requeue);
130130
}
131131

132132
@Override
@@ -137,7 +137,7 @@ public void basicReject(long deliveryTag, boolean requeue) throws IOException {
137137
long realTag = deliveryTag - activeDeliveryTagOffset;
138138
if (realTag > 0) {
139139
transmit(new Basic.Reject(realTag, requeue));
140-
metricsCollector.basicReject(this, deliveryTag);
140+
metricsCollector.basicReject(this, deliveryTag, requeue);
141141
}
142142
}
143143

0 commit comments

Comments
 (0)