diff --git a/src/main/java/com/rabbitmq/client/MetricsCollector.java b/src/main/java/com/rabbitmq/client/MetricsCollector.java index d6905c38f8..35651bae3d 100644 --- a/src/main/java/com/rabbitmq/client/MetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/MetricsCollector.java @@ -58,9 +58,9 @@ default void basicPublishUnrouted(Channel channel) { void basicAck(Channel channel, long deliveryTag, boolean multiple); - void basicNack(Channel channel, long deliveryTag); + void basicNack(Channel channel, long deliveryTag, boolean requeue); - void basicReject(Channel channel, long deliveryTag); + void basicReject(Channel channel, long deliveryTag, boolean requeue); void basicConsume(Channel channel, String consumerTag, boolean autoAck); diff --git a/src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java b/src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java index cb65674e33..340b8bca8d 100644 --- a/src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java @@ -46,12 +46,12 @@ public void basicAck(Channel channel, long deliveryTag, boolean multiple) { } @Override - public void basicNack(Channel channel, long deliveryTag) { + public void basicNack(Channel channel, long deliveryTag, boolean requeue) { } @Override - public void basicReject(Channel channel, long deliveryTag) { + public void basicReject(Channel channel, long deliveryTag, boolean requeue) { } diff --git a/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java b/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java index ca3132f1ab..944b70939d 100644 --- a/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java @@ -43,7 +43,7 @@ public abstract class AbstractMetricsCollector implements MetricsCollector { private final Runnable markAcknowledgedMessageAction = () -> markAcknowledgedMessage(); - private final Runnable markRejectedMessageAction = () -> markRejectedMessage(); + private final Function markRejectedMessageAction = requeue -> () -> markRejectedMessage(requeue); private final Runnable markMessagePublishAcknowledgedAction = () -> markMessagePublishAcknowledged(); @@ -236,18 +236,18 @@ public void basicAck(Channel channel, long deliveryTag, boolean multiple) { } @Override - public void basicNack(Channel channel, long deliveryTag) { + public void basicNack(Channel channel, long deliveryTag, boolean requeue) { try { - updateChannelStateAfterAckReject(channel, deliveryTag, true, GET_UNACKED_DTAGS, markRejectedMessageAction); + updateChannelStateAfterAckReject(channel, deliveryTag, true, GET_UNACKED_DTAGS, markRejectedMessageAction.apply(requeue)); } catch(Exception e) { LOGGER.info("Error while computing metrics in basicNack: " + e.getMessage()); } } @Override - public void basicReject(Channel channel, long deliveryTag) { + public void basicReject(Channel channel, long deliveryTag, boolean requeue) { try { - updateChannelStateAfterAckReject(channel, deliveryTag, false, GET_UNACKED_DTAGS, markRejectedMessageAction); + updateChannelStateAfterAckReject(channel, deliveryTag, false, GET_UNACKED_DTAGS, markRejectedMessageAction.apply(requeue)); } catch(Exception e) { LOGGER.info("Error while computing metrics in basicReject: " + e.getMessage()); } @@ -409,7 +409,7 @@ private ChannelState(Channel channel) { /** * Marks the event of a rejected message. */ - protected abstract void markRejectedMessage(); + protected abstract void markRejectedMessage(boolean requeue); /** * Marks the event of a message publishing acknowledgement. diff --git a/src/main/java/com/rabbitmq/client/impl/ChannelN.java b/src/main/java/com/rabbitmq/client/impl/ChannelN.java index 8b24f5e43a..03c68caab3 100644 --- a/src/main/java/com/rabbitmq/client/impl/ChannelN.java +++ b/src/main/java/com/rabbitmq/client/impl/ChannelN.java @@ -1213,7 +1213,7 @@ public void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException { transmit(new Basic.Nack(deliveryTag, multiple, requeue)); - metricsCollector.basicNack(this, deliveryTag); + metricsCollector.basicNack(this, deliveryTag, requeue); } /** Public API - {@inheritDoc} */ @@ -1222,7 +1222,7 @@ public void basicReject(long deliveryTag, boolean requeue) throws IOException { transmit(new Basic.Reject(deliveryTag, requeue)); - metricsCollector.basicReject(this, deliveryTag); + metricsCollector.basicReject(this, deliveryTag, requeue); } /** Public API - {@inheritDoc} */ diff --git a/src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java b/src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java index 7cbf73c9a9..5fd28ebc03 100644 --- a/src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java @@ -63,6 +63,8 @@ public class MicrometerMetricsCollector extends AbstractMetricsCollector { private final Counter rejectedMessages; + private final Counter requeuedPublishedMessages; + public MicrometerMetricsCollector(MeterRegistry registry) { this(registry, "rabbitmq"); } @@ -90,6 +92,7 @@ public MicrometerMetricsCollector(Function metricsCreator) { this.ackedPublishedMessages = (Counter) metricsCreator.apply(ACKED_PUBLISHED_MESSAGES); this.nackedPublishedMessages = (Counter) metricsCreator.apply(NACKED_PUBLISHED_MESSAGES); this.unroutedPublishedMessages = (Counter) metricsCreator.apply(UNROUTED_PUBLISHED_MESSAGES); + this.requeuedPublishedMessages = (Counter) metricsCreator.apply(REQUEUED_PUBLISHED_MESSAGES); } @Override @@ -133,7 +136,10 @@ protected void markAcknowledgedMessage() { } @Override - protected void markRejectedMessage() { + protected void markRejectedMessage(boolean requeue) { + if (requeue) { + requeuedPublishedMessages.increment(); + } rejectedMessages.increment(); } @@ -252,6 +258,12 @@ Object create(MeterRegistry registry, String prefix, Iterable tags) { Object create(MeterRegistry registry, String prefix, Iterable tags) { return registry.counter(prefix + ".unrouted_published", tags); } + }, + REQUEUED_PUBLISHED_MESSAGES { + @Override + Object create(MeterRegistry registry, String prefix, Iterable tags) { + return registry.counter(prefix + ".requeued_published", tags); + } }; abstract Object create(MeterRegistry registry, String prefix, Iterable tags); diff --git a/src/main/java/com/rabbitmq/client/impl/OpenTelemetryMetricsCollector.java b/src/main/java/com/rabbitmq/client/impl/OpenTelemetryMetricsCollector.java index d3cc4067da..f00fea662d 100644 --- a/src/main/java/com/rabbitmq/client/impl/OpenTelemetryMetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/impl/OpenTelemetryMetricsCollector.java @@ -46,6 +46,7 @@ public class OpenTelemetryMetricsCollector extends AbstractMetricsCollector { private final LongCounter ackedPublishedMessagesCounter; private final LongCounter nackedPublishedMessagesCounter; private final LongCounter unroutedPublishedMessagesCounter; + private final LongCounter requeuedPublishedMessagesCounter; public OpenTelemetryMetricsCollector(OpenTelemetry openTelemetry) { this(openTelemetry, "rabbitmq"); @@ -123,6 +124,12 @@ public OpenTelemetryMetricsCollector(final OpenTelemetry openTelemetry, final St .setUnit("{messages}") .setDescription("The number of un-routed published messages to the RabbitMQ server") .build(); + + // requeuedPublishedMessages + this.requeuedPublishedMessagesCounter = meter.counterBuilder(prefix + ".requeued_published") + .setUnit("{messages}") + .setDescription("The number of re-queued published messages to the RabbitMQ server") + .build(); } @Override @@ -166,7 +173,10 @@ protected void markAcknowledgedMessage() { } @Override - protected void markRejectedMessage() { + protected void markRejectedMessage(boolean requeue) { + if (requeue) { + requeuedPublishedMessagesCounter.add(1L, attributes); + } rejectedMessagesCounter.add(1L, attributes); } diff --git a/src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java b/src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java index a4ddfc24f4..2405994461 100644 --- a/src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java @@ -45,6 +45,7 @@ public class StandardMetricsCollector extends AbstractMetricsCollector { private final Meter publishAcknowledgedMessages; private final Meter publishNacknowledgedMessages; private final Meter publishUnroutedMessages; + private final Meter requeuedPublishedMessages; public StandardMetricsCollector(MetricRegistry registry, String metricsPrefix) { @@ -59,6 +60,7 @@ public StandardMetricsCollector(MetricRegistry registry, String metricsPrefix) { this.consumedMessages = registry.meter(metricsPrefix+".consumed"); this.acknowledgedMessages = registry.meter(metricsPrefix+".acknowledged"); this.rejectedMessages = registry.meter(metricsPrefix+".rejected"); + this.requeuedPublishedMessages = registry.meter(metricsPrefix+".requeued_published"); } public StandardMetricsCollector() { @@ -110,7 +112,7 @@ protected void markAcknowledgedMessage() { } @Override - protected void markRejectedMessage() { + protected void markRejectedMessage(boolean requeue) { rejectedMessages.mark(); } diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareChannelN.java b/src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareChannelN.java index ccfa59be00..c1c9cb9e4b 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareChannelN.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareChannelN.java @@ -126,7 +126,7 @@ public void basicNack(long deliveryTag, boolean multiple, boolean requeue) throw return; } transmit(new Basic.Nack(realTag, multiple, requeue)); - metricsCollector.basicNack(this, deliveryTag); + metricsCollector.basicNack(this, deliveryTag, requeue); } @Override @@ -137,7 +137,7 @@ public void basicReject(long deliveryTag, boolean requeue) throws IOException { long realTag = deliveryTag - activeDeliveryTagOffset; if (realTag > 0) { transmit(new Basic.Reject(realTag, requeue)); - metricsCollector.basicReject(this, deliveryTag); + metricsCollector.basicReject(this, deliveryTag, requeue); } }