Skip to content

Commit afd97f7

Browse files
authored
Merge pull request #1476 from rabbitmq/ohksj77-distinguish-nack-metrics-by-requeue-flag
Add requeued message count metric
2 parents dea4b6a + 28c7d51 commit afd97f7

File tree

9 files changed

+183
-15
lines changed

9 files changed

+183
-15
lines changed

src/main/java/com/rabbitmq/client/MetricsCollector.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
1+
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -60,8 +60,16 @@ default void basicPublishUnrouted(Channel channel) {
6060

6161
void basicNack(Channel channel, long deliveryTag);
6262

63+
default void basicNack(Channel channel, long deliveryTag, boolean requeue) {
64+
this.basicNack(channel, deliveryTag);
65+
}
66+
6367
void basicReject(Channel channel, long deliveryTag);
6468

69+
default void basicReject(Channel channel, long deliveryTag, boolean requeue) {
70+
this.basicReject(channel, deliveryTag);
71+
}
72+
6573
void basicConsume(Channel channel, String consumerTag, boolean autoAck);
6674

6775
void basicCancel(Channel channel, String consumerTag);

src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
1+
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -50,11 +50,21 @@ public void basicNack(Channel channel, long deliveryTag) {
5050

5151
}
5252

53+
@Override
54+
public void basicNack(Channel channel, long deliveryTag, boolean requeue) {
55+
56+
}
57+
5358
@Override
5459
public void basicReject(Channel channel, long deliveryTag) {
5560

5661
}
5762

63+
@Override
64+
public void basicReject(Channel channel, long deliveryTag, boolean requeue) {
65+
66+
}
67+
5868
@Override
5969
public void basicConsume(Channel channel, String consumerTag, boolean autoAck) {
6070

src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
1+
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -43,7 +43,7 @@ public abstract class AbstractMetricsCollector implements MetricsCollector {
4343

4444
private final Runnable markAcknowledgedMessageAction = () -> markAcknowledgedMessage();
4545

46-
private final Runnable markRejectedMessageAction = () -> markRejectedMessage();
46+
private final Function<Boolean, Runnable> markRejectedMessageAction;
4747

4848
private final Runnable markMessagePublishAcknowledgedAction = () -> markMessagePublishAcknowledged();
4949

@@ -53,6 +53,12 @@ public abstract class AbstractMetricsCollector implements MetricsCollector {
5353

5454
private static final Function<ChannelState, Set<Long>> GET_UNCONFIRMED_DTAGS = channelState -> channelState.unconfirmedMessageDeliveryTags;
5555

56+
public AbstractMetricsCollector() {
57+
Runnable rejectRequeue = () -> markRejectedMessage(true);
58+
Runnable rejectNoRequeue = () -> markRejectedMessage(false);
59+
this.markRejectedMessageAction = requeue -> requeue ? rejectRequeue : rejectNoRequeue;
60+
}
61+
5662
@Override
5763
public void newConnection(final Connection connection) {
5864
try {
@@ -237,17 +243,27 @@ public void basicAck(Channel channel, long deliveryTag, boolean multiple) {
237243

238244
@Override
239245
public void basicNack(Channel channel, long deliveryTag) {
246+
// replaced by #basicNack(Channel, long, boolean)
247+
}
248+
249+
@Override
250+
public void basicNack(Channel channel, long deliveryTag, boolean requeue) {
240251
try {
241-
updateChannelStateAfterAckReject(channel, deliveryTag, true, GET_UNACKED_DTAGS, markRejectedMessageAction);
252+
updateChannelStateAfterAckReject(channel, deliveryTag, true, GET_UNACKED_DTAGS, markRejectedMessageAction.apply(requeue));
242253
} catch(Exception e) {
243254
LOGGER.info("Error while computing metrics in basicNack: " + e.getMessage());
244255
}
245256
}
246257

247258
@Override
248259
public void basicReject(Channel channel, long deliveryTag) {
260+
// replaced by #basicReject(Channel, long, boolean)
261+
}
262+
263+
@Override
264+
public void basicReject(Channel channel, long deliveryTag, boolean requeue) {
249265
try {
250-
updateChannelStateAfterAckReject(channel, deliveryTag, false, GET_UNACKED_DTAGS, markRejectedMessageAction);
266+
updateChannelStateAfterAckReject(channel, deliveryTag, false, GET_UNACKED_DTAGS, markRejectedMessageAction.apply(requeue));
251267
} catch(Exception e) {
252268
LOGGER.info("Error while computing metrics in basicReject: " + e.getMessage());
253269
}
@@ -408,9 +424,18 @@ private ChannelState(Channel channel) {
408424

409425
/**
410426
* Marks the event of a rejected message.
427+
*
428+
* @deprecated Use {@link #markRejectedMessage(boolean)} instead
411429
*/
412430
protected abstract void markRejectedMessage();
413431

432+
/**
433+
* Marks the event of a rejected message.
434+
*/
435+
protected void markRejectedMessage(boolean requeue) {
436+
this.markRejectedMessage();
437+
}
438+
414439
/**
415440
* Marks the event of a message publishing acknowledgement.
416441
*/

src/main/java/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1213,7 +1213,7 @@ public void basicNack(long deliveryTag, boolean multiple, boolean requeue)
12131213
throws IOException
12141214
{
12151215
transmit(new Basic.Nack(deliveryTag, multiple, requeue));
1216-
metricsCollector.basicNack(this, deliveryTag);
1216+
metricsCollector.basicNack(this, deliveryTag, requeue);
12171217
}
12181218

12191219
/** Public API - {@inheritDoc} */
@@ -1222,7 +1222,7 @@ public void basicReject(long deliveryTag, boolean requeue)
12221222
throws IOException
12231223
{
12241224
transmit(new Basic.Reject(deliveryTag, requeue));
1225-
metricsCollector.basicReject(this, deliveryTag);
1225+
metricsCollector.basicReject(this, deliveryTag, requeue);
12261226
}
12271227

12281228
/** Public API - {@inheritDoc} */

src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ public class MicrometerMetricsCollector extends AbstractMetricsCollector {
6363

6464
private final Counter rejectedMessages;
6565

66+
private final Counter requeuedMessages;
67+
6668
public MicrometerMetricsCollector(MeterRegistry registry) {
6769
this(registry, "rabbitmq");
6870
}
@@ -90,6 +92,7 @@ public MicrometerMetricsCollector(Function<Metrics, Object> metricsCreator) {
9092
this.ackedPublishedMessages = (Counter) metricsCreator.apply(ACKED_PUBLISHED_MESSAGES);
9193
this.nackedPublishedMessages = (Counter) metricsCreator.apply(NACKED_PUBLISHED_MESSAGES);
9294
this.unroutedPublishedMessages = (Counter) metricsCreator.apply(UNROUTED_PUBLISHED_MESSAGES);
95+
this.requeuedMessages = (Counter) metricsCreator.apply(REQUEUED_MESSAGES);
9396
}
9497

9598
@Override
@@ -133,7 +136,16 @@ protected void markAcknowledgedMessage() {
133136
}
134137

135138
@Override
139+
@SuppressWarnings("deprecation")
136140
protected void markRejectedMessage() {
141+
142+
}
143+
144+
@Override
145+
protected void markRejectedMessage(boolean requeue) {
146+
if (requeue) {
147+
requeuedMessages.increment();
148+
}
137149
rejectedMessages.increment();
138150
}
139151

@@ -192,6 +204,10 @@ public Counter getRejectedMessages() {
192204
return rejectedMessages;
193205
}
194206

207+
public Counter getRequeuedMessages() {
208+
return requeuedMessages;
209+
}
210+
195211
public enum Metrics {
196212
CONNECTIONS {
197213
@Override
@@ -229,6 +245,12 @@ Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
229245
return registry.counter(prefix + ".rejected", tags);
230246
}
231247
},
248+
REQUEUED_MESSAGES {
249+
@Override
250+
Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
251+
return registry.counter(prefix + ".requeued", tags);
252+
}
253+
},
232254
FAILED_TO_PUBLISH_MESSAGES {
233255
@Override
234256
Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {

src/main/java/com/rabbitmq/client/impl/OpenTelemetryMetricsCollector.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
1+
// Copyright (c) 2023-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -46,6 +46,7 @@ public class OpenTelemetryMetricsCollector extends AbstractMetricsCollector {
4646
private final LongCounter ackedPublishedMessagesCounter;
4747
private final LongCounter nackedPublishedMessagesCounter;
4848
private final LongCounter unroutedPublishedMessagesCounter;
49+
private final LongCounter requeuedMessagesCounter;
4950

5051
public OpenTelemetryMetricsCollector(OpenTelemetry openTelemetry) {
5152
this(openTelemetry, "rabbitmq");
@@ -100,6 +101,12 @@ public OpenTelemetryMetricsCollector(final OpenTelemetry openTelemetry, final St
100101
.setDescription("The number of messages rejected from the RabbitMQ server")
101102
.build();
102103

104+
// requeuedPublishedMessages
105+
this.requeuedMessagesCounter = meter.counterBuilder(prefix + ".requeued")
106+
.setUnit("{messages}")
107+
.setDescription("The number of re-queued messages to the RabbitMQ server")
108+
.build();
109+
103110
// failedToPublishMessages
104111
this.failedToPublishMessagesCounter = meter.counterBuilder(prefix + ".failed_to_publish")
105112
.setUnit("{messages}")
@@ -166,7 +173,16 @@ protected void markAcknowledgedMessage() {
166173
}
167174

168175
@Override
176+
@SuppressWarnings("deprecation")
169177
protected void markRejectedMessage() {
178+
179+
}
180+
181+
@Override
182+
protected void markRejectedMessage(boolean requeue) {
183+
if (requeue) {
184+
requeuedMessagesCounter.add(1L, attributes);
185+
}
170186
rejectedMessagesCounter.add(1L, attributes);
171187
}
172188

src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
1+
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -41,12 +41,12 @@ public class StandardMetricsCollector extends AbstractMetricsCollector {
4141
private final Meter consumedMessages;
4242
private final Meter acknowledgedMessages;
4343
private final Meter rejectedMessages;
44+
private final Meter requeuedMessages;
4445
private final Meter failedToPublishMessages;
4546
private final Meter publishAcknowledgedMessages;
4647
private final Meter publishNacknowledgedMessages;
4748
private final Meter publishUnroutedMessages;
4849

49-
5050
public StandardMetricsCollector(MetricRegistry registry, String metricsPrefix) {
5151
this.registry = registry;
5252
this.connections = registry.counter(metricsPrefix+".connections");
@@ -59,6 +59,7 @@ public StandardMetricsCollector(MetricRegistry registry, String metricsPrefix) {
5959
this.consumedMessages = registry.meter(metricsPrefix+".consumed");
6060
this.acknowledgedMessages = registry.meter(metricsPrefix+".acknowledged");
6161
this.rejectedMessages = registry.meter(metricsPrefix+".rejected");
62+
this.requeuedMessages = registry.meter(metricsPrefix+".requeued");
6263
}
6364

6465
public StandardMetricsCollector() {
@@ -110,7 +111,16 @@ protected void markAcknowledgedMessage() {
110111
}
111112

112113
@Override
114+
@SuppressWarnings("deprecation")
113115
protected void markRejectedMessage() {
116+
117+
}
118+
119+
@Override
120+
protected void markRejectedMessage(boolean requeue) {
121+
if (requeue) {
122+
requeuedMessages.mark();
123+
}
114124
rejectedMessages.mark();
115125
}
116126

@@ -157,6 +167,10 @@ public Meter getRejectedMessages() {
157167
return rejectedMessages;
158168
}
159169

170+
public Meter getRequeuedMessages() {
171+
return this.requeuedMessages;
172+
}
173+
160174
public Meter getFailedToPublishMessages() {
161175
return failedToPublishMessages;
162176
}

src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareChannelN.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ public void basicNack(long deliveryTag, boolean multiple, boolean requeue) throw
126126
return;
127127
}
128128
transmit(new Basic.Nack(realTag, multiple, requeue));
129-
metricsCollector.basicNack(this, deliveryTag);
129+
metricsCollector.basicNack(this, deliveryTag, requeue);
130130
}
131131

132132
@Override
@@ -137,7 +137,7 @@ public void basicReject(long deliveryTag, boolean requeue) throws IOException {
137137
long realTag = deliveryTag - activeDeliveryTagOffset;
138138
if (realTag > 0) {
139139
transmit(new Basic.Reject(realTag, requeue));
140-
metricsCollector.basicReject(this, deliveryTag);
140+
metricsCollector.basicReject(this, deliveryTag, requeue);
141141
}
142142
}
143143

0 commit comments

Comments
 (0)