From 92b944d90ecb07d6798283a8178e84fbfcf97641 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EA=B9=80=EC=8A=B9=EC=A7=84?= Date: Sun, 10 Nov 2024 20:27:31 +0900 Subject: [PATCH 1/5] Distinguish nack metrics by requeue flag --- .../java/com/rabbitmq/client/MetricsCollector.java | 4 ++-- .../com/rabbitmq/client/NoOpMetricsCollector.java | 4 ++-- .../client/impl/AbstractMetricsCollector.java | 12 ++++++------ .../java/com/rabbitmq/client/impl/ChannelN.java | 4 ++-- .../client/impl/MicrometerMetricsCollector.java | 14 +++++++++++++- .../client/impl/OpenTelemetryMetricsCollector.java | 12 +++++++++++- .../client/impl/StandardMetricsCollector.java | 4 +++- .../impl/recovery/RecoveryAwareChannelN.java | 4 ++-- 8 files changed, 41 insertions(+), 17 deletions(-) diff --git a/src/main/java/com/rabbitmq/client/MetricsCollector.java b/src/main/java/com/rabbitmq/client/MetricsCollector.java index d6905c38f8..35651bae3d 100644 --- a/src/main/java/com/rabbitmq/client/MetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/MetricsCollector.java @@ -58,9 +58,9 @@ default void basicPublishUnrouted(Channel channel) { void basicAck(Channel channel, long deliveryTag, boolean multiple); - void basicNack(Channel channel, long deliveryTag); + void basicNack(Channel channel, long deliveryTag, boolean requeue); - void basicReject(Channel channel, long deliveryTag); + void basicReject(Channel channel, long deliveryTag, boolean requeue); void basicConsume(Channel channel, String consumerTag, boolean autoAck); diff --git a/src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java b/src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java index cb65674e33..340b8bca8d 100644 --- a/src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java @@ -46,12 +46,12 @@ public void basicAck(Channel channel, long deliveryTag, boolean multiple) { } @Override - public void basicNack(Channel channel, long deliveryTag) { + public void basicNack(Channel channel, long deliveryTag, boolean requeue) { } @Override - public void basicReject(Channel channel, long deliveryTag) { + public void basicReject(Channel channel, long deliveryTag, boolean requeue) { } diff --git a/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java b/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java index ca3132f1ab..944b70939d 100644 --- a/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java @@ -43,7 +43,7 @@ public abstract class AbstractMetricsCollector implements MetricsCollector { private final Runnable markAcknowledgedMessageAction = () -> markAcknowledgedMessage(); - private final Runnable markRejectedMessageAction = () -> markRejectedMessage(); + private final Function markRejectedMessageAction = requeue -> () -> markRejectedMessage(requeue); private final Runnable markMessagePublishAcknowledgedAction = () -> markMessagePublishAcknowledged(); @@ -236,18 +236,18 @@ public void basicAck(Channel channel, long deliveryTag, boolean multiple) { } @Override - public void basicNack(Channel channel, long deliveryTag) { + public void basicNack(Channel channel, long deliveryTag, boolean requeue) { try { - updateChannelStateAfterAckReject(channel, deliveryTag, true, GET_UNACKED_DTAGS, markRejectedMessageAction); + updateChannelStateAfterAckReject(channel, deliveryTag, true, GET_UNACKED_DTAGS, markRejectedMessageAction.apply(requeue)); } catch(Exception e) { LOGGER.info("Error while computing metrics in basicNack: " + e.getMessage()); } } @Override - public void basicReject(Channel channel, long deliveryTag) { + public void basicReject(Channel channel, long deliveryTag, boolean requeue) { try { - updateChannelStateAfterAckReject(channel, deliveryTag, false, GET_UNACKED_DTAGS, markRejectedMessageAction); + updateChannelStateAfterAckReject(channel, deliveryTag, false, GET_UNACKED_DTAGS, markRejectedMessageAction.apply(requeue)); } catch(Exception e) { LOGGER.info("Error while computing metrics in basicReject: " + e.getMessage()); } @@ -409,7 +409,7 @@ private ChannelState(Channel channel) { /** * Marks the event of a rejected message. */ - protected abstract void markRejectedMessage(); + protected abstract void markRejectedMessage(boolean requeue); /** * Marks the event of a message publishing acknowledgement. diff --git a/src/main/java/com/rabbitmq/client/impl/ChannelN.java b/src/main/java/com/rabbitmq/client/impl/ChannelN.java index 8b24f5e43a..03c68caab3 100644 --- a/src/main/java/com/rabbitmq/client/impl/ChannelN.java +++ b/src/main/java/com/rabbitmq/client/impl/ChannelN.java @@ -1213,7 +1213,7 @@ public void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException { transmit(new Basic.Nack(deliveryTag, multiple, requeue)); - metricsCollector.basicNack(this, deliveryTag); + metricsCollector.basicNack(this, deliveryTag, requeue); } /** Public API - {@inheritDoc} */ @@ -1222,7 +1222,7 @@ public void basicReject(long deliveryTag, boolean requeue) throws IOException { transmit(new Basic.Reject(deliveryTag, requeue)); - metricsCollector.basicReject(this, deliveryTag); + metricsCollector.basicReject(this, deliveryTag, requeue); } /** Public API - {@inheritDoc} */ diff --git a/src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java b/src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java index 7cbf73c9a9..5fd28ebc03 100644 --- a/src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java @@ -63,6 +63,8 @@ public class MicrometerMetricsCollector extends AbstractMetricsCollector { private final Counter rejectedMessages; + private final Counter requeuedPublishedMessages; + public MicrometerMetricsCollector(MeterRegistry registry) { this(registry, "rabbitmq"); } @@ -90,6 +92,7 @@ public MicrometerMetricsCollector(Function metricsCreator) { this.ackedPublishedMessages = (Counter) metricsCreator.apply(ACKED_PUBLISHED_MESSAGES); this.nackedPublishedMessages = (Counter) metricsCreator.apply(NACKED_PUBLISHED_MESSAGES); this.unroutedPublishedMessages = (Counter) metricsCreator.apply(UNROUTED_PUBLISHED_MESSAGES); + this.requeuedPublishedMessages = (Counter) metricsCreator.apply(REQUEUED_PUBLISHED_MESSAGES); } @Override @@ -133,7 +136,10 @@ protected void markAcknowledgedMessage() { } @Override - protected void markRejectedMessage() { + protected void markRejectedMessage(boolean requeue) { + if (requeue) { + requeuedPublishedMessages.increment(); + } rejectedMessages.increment(); } @@ -252,6 +258,12 @@ Object create(MeterRegistry registry, String prefix, Iterable tags) { Object create(MeterRegistry registry, String prefix, Iterable tags) { return registry.counter(prefix + ".unrouted_published", tags); } + }, + REQUEUED_PUBLISHED_MESSAGES { + @Override + Object create(MeterRegistry registry, String prefix, Iterable tags) { + return registry.counter(prefix + ".requeued_published", tags); + } }; abstract Object create(MeterRegistry registry, String prefix, Iterable tags); diff --git a/src/main/java/com/rabbitmq/client/impl/OpenTelemetryMetricsCollector.java b/src/main/java/com/rabbitmq/client/impl/OpenTelemetryMetricsCollector.java index d3cc4067da..f00fea662d 100644 --- a/src/main/java/com/rabbitmq/client/impl/OpenTelemetryMetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/impl/OpenTelemetryMetricsCollector.java @@ -46,6 +46,7 @@ public class OpenTelemetryMetricsCollector extends AbstractMetricsCollector { private final LongCounter ackedPublishedMessagesCounter; private final LongCounter nackedPublishedMessagesCounter; private final LongCounter unroutedPublishedMessagesCounter; + private final LongCounter requeuedPublishedMessagesCounter; public OpenTelemetryMetricsCollector(OpenTelemetry openTelemetry) { this(openTelemetry, "rabbitmq"); @@ -123,6 +124,12 @@ public OpenTelemetryMetricsCollector(final OpenTelemetry openTelemetry, final St .setUnit("{messages}") .setDescription("The number of un-routed published messages to the RabbitMQ server") .build(); + + // requeuedPublishedMessages + this.requeuedPublishedMessagesCounter = meter.counterBuilder(prefix + ".requeued_published") + .setUnit("{messages}") + .setDescription("The number of re-queued published messages to the RabbitMQ server") + .build(); } @Override @@ -166,7 +173,10 @@ protected void markAcknowledgedMessage() { } @Override - protected void markRejectedMessage() { + protected void markRejectedMessage(boolean requeue) { + if (requeue) { + requeuedPublishedMessagesCounter.add(1L, attributes); + } rejectedMessagesCounter.add(1L, attributes); } diff --git a/src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java b/src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java index a4ddfc24f4..2405994461 100644 --- a/src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java @@ -45,6 +45,7 @@ public class StandardMetricsCollector extends AbstractMetricsCollector { private final Meter publishAcknowledgedMessages; private final Meter publishNacknowledgedMessages; private final Meter publishUnroutedMessages; + private final Meter requeuedPublishedMessages; public StandardMetricsCollector(MetricRegistry registry, String metricsPrefix) { @@ -59,6 +60,7 @@ public StandardMetricsCollector(MetricRegistry registry, String metricsPrefix) { this.consumedMessages = registry.meter(metricsPrefix+".consumed"); this.acknowledgedMessages = registry.meter(metricsPrefix+".acknowledged"); this.rejectedMessages = registry.meter(metricsPrefix+".rejected"); + this.requeuedPublishedMessages = registry.meter(metricsPrefix+".requeued_published"); } public StandardMetricsCollector() { @@ -110,7 +112,7 @@ protected void markAcknowledgedMessage() { } @Override - protected void markRejectedMessage() { + protected void markRejectedMessage(boolean requeue) { rejectedMessages.mark(); } diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareChannelN.java b/src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareChannelN.java index ccfa59be00..c1c9cb9e4b 100644 --- a/src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareChannelN.java +++ b/src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareChannelN.java @@ -126,7 +126,7 @@ public void basicNack(long deliveryTag, boolean multiple, boolean requeue) throw return; } transmit(new Basic.Nack(realTag, multiple, requeue)); - metricsCollector.basicNack(this, deliveryTag); + metricsCollector.basicNack(this, deliveryTag, requeue); } @Override @@ -137,7 +137,7 @@ public void basicReject(long deliveryTag, boolean requeue) throws IOException { long realTag = deliveryTag - activeDeliveryTagOffset; if (realTag > 0) { transmit(new Basic.Reject(realTag, requeue)); - metricsCollector.basicReject(this, deliveryTag); + metricsCollector.basicReject(this, deliveryTag, requeue); } } From cef7690142663a03d48036ae74598db72d526cba Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 11 Nov 2024 00:06:33 +0000 Subject: [PATCH 2/5] Bump opentelemetry.version from 1.43.0 to 1.44.1 Bumps `opentelemetry.version` from 1.43.0 to 1.44.1. Updates `io.opentelemetry:opentelemetry-api` from 1.43.0 to 1.44.1 - [Release notes](https://github.com/open-telemetry/opentelemetry-java/releases) - [Changelog](https://github.com/open-telemetry/opentelemetry-java/blob/v1.44.1/CHANGELOG.md) - [Commits](https://github.com/open-telemetry/opentelemetry-java/compare/v1.43.0...v1.44.1) Updates `io.opentelemetry:opentelemetry-sdk-testing` from 1.43.0 to 1.44.1 - [Release notes](https://github.com/open-telemetry/opentelemetry-java/releases) - [Changelog](https://github.com/open-telemetry/opentelemetry-java/blob/v1.44.1/CHANGELOG.md) - [Commits](https://github.com/open-telemetry/opentelemetry-java/compare/v1.43.0...v1.44.1) --- updated-dependencies: - dependency-name: io.opentelemetry:opentelemetry-api dependency-type: direct:production update-type: version-update:semver-minor - dependency-name: io.opentelemetry:opentelemetry-sdk-testing dependency-type: direct:development update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 6b734d3425..c2968c0b52 100644 --- a/pom.xml +++ b/pom.xml @@ -58,7 +58,7 @@ 1.7.36 4.2.28 1.13.6 - 1.43.0 + 1.44.1 2.18.1 1.2.13 5.11.3 From b824a757b8dd12f43cb93d2b41b65c51a66352d7 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 12 Nov 2024 00:37:18 +0000 Subject: [PATCH 3/5] Bump io.micrometer:micrometer-tracing-integration-test Bumps [io.micrometer:micrometer-tracing-integration-test](https://github.com/micrometer-metrics/tracing) from 1.3.5 to 1.3.6. - [Release notes](https://github.com/micrometer-metrics/tracing/releases) - [Commits](https://github.com/micrometer-metrics/tracing/compare/v1.3.5...v1.3.6) --- updated-dependencies: - dependency-name: io.micrometer:micrometer-tracing-integration-test dependency-type: direct:development update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index c2968c0b52..dfa5fffff2 100644 --- a/pom.xml +++ b/pom.xml @@ -64,7 +64,7 @@ 5.11.3 5.14.2 3.26.3 - 1.3.5 + 1.3.6 1.0.4 9.4.56.v20240826 1.79 From 1d0b4cb234334abb0d9a30698831c5423a280bdc Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 12 Nov 2024 00:37:01 +0000 Subject: [PATCH 4/5] Bump io.micrometer:micrometer-core from 1.13.6 to 1.13.7 Bumps [io.micrometer:micrometer-core](https://github.com/micrometer-metrics/micrometer) from 1.13.6 to 1.13.7. - [Release notes](https://github.com/micrometer-metrics/micrometer/releases) - [Commits](https://github.com/micrometer-metrics/micrometer/compare/v1.13.6...v1.13.7) --- updated-dependencies: - dependency-name: io.micrometer:micrometer-core dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index dfa5fffff2..52a0329e34 100644 --- a/pom.xml +++ b/pom.xml @@ -57,7 +57,7 @@ true 1.7.36 4.2.28 - 1.13.6 + 1.13.7 1.44.1 2.18.1 1.2.13 From 28c7d515d28744fa296a3c92b8a08927eea72b53 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Tue, 12 Nov 2024 11:20:17 +0100 Subject: [PATCH 5/5] Polish support for new "requeued" count metric Ensure backward compatibility, use "requeued" name, update copyright year, add test. --- .../com/rabbitmq/client/MetricsCollector.java | 14 +++- .../rabbitmq/client/NoOpMetricsCollector.java | 12 ++- .../client/impl/AbstractMetricsCollector.java | 31 +++++++- .../impl/MicrometerMetricsCollector.java | 28 ++++--- .../impl/OpenTelemetryMetricsCollector.java | 24 +++--- .../client/impl/StandardMetricsCollector.java | 20 ++++- .../client/test/MetricsCollectorTest.java | 77 ++++++++++++++++++- 7 files changed, 175 insertions(+), 31 deletions(-) diff --git a/src/main/java/com/rabbitmq/client/MetricsCollector.java b/src/main/java/com/rabbitmq/client/MetricsCollector.java index 35651bae3d..812574c4d1 100644 --- a/src/main/java/com/rabbitmq/client/MetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/MetricsCollector.java @@ -1,4 +1,4 @@ -// Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Java client library, is triple-licensed under the // Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 @@ -58,9 +58,17 @@ default void basicPublishUnrouted(Channel channel) { void basicAck(Channel channel, long deliveryTag, boolean multiple); - void basicNack(Channel channel, long deliveryTag, boolean requeue); + void basicNack(Channel channel, long deliveryTag); - void basicReject(Channel channel, long deliveryTag, boolean requeue); + default void basicNack(Channel channel, long deliveryTag, boolean requeue) { + this.basicNack(channel, deliveryTag); + } + + void basicReject(Channel channel, long deliveryTag); + + default void basicReject(Channel channel, long deliveryTag, boolean requeue) { + this.basicReject(channel, deliveryTag); + } void basicConsume(Channel channel, String consumerTag, boolean autoAck); diff --git a/src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java b/src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java index 340b8bca8d..0b703f0edd 100644 --- a/src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java @@ -1,4 +1,4 @@ -// Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Java client library, is triple-licensed under the // Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 @@ -45,11 +45,21 @@ public void basicAck(Channel channel, long deliveryTag, boolean multiple) { } + @Override + public void basicNack(Channel channel, long deliveryTag) { + + } + @Override public void basicNack(Channel channel, long deliveryTag, boolean requeue) { } + @Override + public void basicReject(Channel channel, long deliveryTag) { + + } + @Override public void basicReject(Channel channel, long deliveryTag, boolean requeue) { diff --git a/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java b/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java index 944b70939d..74df1d72b7 100644 --- a/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java @@ -1,4 +1,4 @@ -// Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Java client library, is triple-licensed under the // Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 @@ -43,7 +43,7 @@ public abstract class AbstractMetricsCollector implements MetricsCollector { private final Runnable markAcknowledgedMessageAction = () -> markAcknowledgedMessage(); - private final Function markRejectedMessageAction = requeue -> () -> markRejectedMessage(requeue); + private final Function markRejectedMessageAction; private final Runnable markMessagePublishAcknowledgedAction = () -> markMessagePublishAcknowledged(); @@ -53,6 +53,12 @@ public abstract class AbstractMetricsCollector implements MetricsCollector { private static final Function> GET_UNCONFIRMED_DTAGS = channelState -> channelState.unconfirmedMessageDeliveryTags; + public AbstractMetricsCollector() { + Runnable rejectRequeue = () -> markRejectedMessage(true); + Runnable rejectNoRequeue = () -> markRejectedMessage(false); + this.markRejectedMessageAction = requeue -> requeue ? rejectRequeue : rejectNoRequeue; + } + @Override public void newConnection(final Connection connection) { try { @@ -235,6 +241,11 @@ public void basicAck(Channel channel, long deliveryTag, boolean multiple) { } } + @Override + public void basicNack(Channel channel, long deliveryTag) { + // replaced by #basicNack(Channel, long, boolean) + } + @Override public void basicNack(Channel channel, long deliveryTag, boolean requeue) { try { @@ -244,6 +255,11 @@ public void basicNack(Channel channel, long deliveryTag, boolean requeue) { } } + @Override + public void basicReject(Channel channel, long deliveryTag) { + // replaced by #basicReject(Channel, long, boolean) + } + @Override public void basicReject(Channel channel, long deliveryTag, boolean requeue) { try { @@ -406,10 +422,19 @@ private ChannelState(Channel channel) { */ protected abstract void markAcknowledgedMessage(); + /** + * Marks the event of a rejected message. + * + * @deprecated Use {@link #markRejectedMessage(boolean)} instead + */ + protected abstract void markRejectedMessage(); + /** * Marks the event of a rejected message. */ - protected abstract void markRejectedMessage(boolean requeue); + protected void markRejectedMessage(boolean requeue) { + this.markRejectedMessage(); + } /** * Marks the event of a message publishing acknowledgement. diff --git a/src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java b/src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java index 5fd28ebc03..5ebf3389d2 100644 --- a/src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java @@ -63,7 +63,7 @@ public class MicrometerMetricsCollector extends AbstractMetricsCollector { private final Counter rejectedMessages; - private final Counter requeuedPublishedMessages; + private final Counter requeuedMessages; public MicrometerMetricsCollector(MeterRegistry registry) { this(registry, "rabbitmq"); @@ -92,7 +92,7 @@ public MicrometerMetricsCollector(Function metricsCreator) { this.ackedPublishedMessages = (Counter) metricsCreator.apply(ACKED_PUBLISHED_MESSAGES); this.nackedPublishedMessages = (Counter) metricsCreator.apply(NACKED_PUBLISHED_MESSAGES); this.unroutedPublishedMessages = (Counter) metricsCreator.apply(UNROUTED_PUBLISHED_MESSAGES); - this.requeuedPublishedMessages = (Counter) metricsCreator.apply(REQUEUED_PUBLISHED_MESSAGES); + this.requeuedMessages = (Counter) metricsCreator.apply(REQUEUED_MESSAGES); } @Override @@ -135,10 +135,16 @@ protected void markAcknowledgedMessage() { acknowledgedMessages.increment(); } + @Override + @SuppressWarnings("deprecation") + protected void markRejectedMessage() { + + } + @Override protected void markRejectedMessage(boolean requeue) { if (requeue) { - requeuedPublishedMessages.increment(); + requeuedMessages.increment(); } rejectedMessages.increment(); } @@ -198,6 +204,10 @@ public Counter getRejectedMessages() { return rejectedMessages; } + public Counter getRequeuedMessages() { + return requeuedMessages; + } + public enum Metrics { CONNECTIONS { @Override @@ -235,6 +245,12 @@ Object create(MeterRegistry registry, String prefix, Iterable tags) { return registry.counter(prefix + ".rejected", tags); } }, + REQUEUED_MESSAGES { + @Override + Object create(MeterRegistry registry, String prefix, Iterable tags) { + return registry.counter(prefix + ".requeued", tags); + } + }, FAILED_TO_PUBLISH_MESSAGES { @Override Object create(MeterRegistry registry, String prefix, Iterable tags) { @@ -258,12 +274,6 @@ Object create(MeterRegistry registry, String prefix, Iterable tags) { Object create(MeterRegistry registry, String prefix, Iterable tags) { return registry.counter(prefix + ".unrouted_published", tags); } - }, - REQUEUED_PUBLISHED_MESSAGES { - @Override - Object create(MeterRegistry registry, String prefix, Iterable tags) { - return registry.counter(prefix + ".requeued_published", tags); - } }; abstract Object create(MeterRegistry registry, String prefix, Iterable tags); diff --git a/src/main/java/com/rabbitmq/client/impl/OpenTelemetryMetricsCollector.java b/src/main/java/com/rabbitmq/client/impl/OpenTelemetryMetricsCollector.java index f00fea662d..404ac5a9b0 100644 --- a/src/main/java/com/rabbitmq/client/impl/OpenTelemetryMetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/impl/OpenTelemetryMetricsCollector.java @@ -1,4 +1,4 @@ -// Copyright (c) 2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// Copyright (c) 2023-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Java client library, is triple-licensed under the // Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 @@ -46,7 +46,7 @@ public class OpenTelemetryMetricsCollector extends AbstractMetricsCollector { private final LongCounter ackedPublishedMessagesCounter; private final LongCounter nackedPublishedMessagesCounter; private final LongCounter unroutedPublishedMessagesCounter; - private final LongCounter requeuedPublishedMessagesCounter; + private final LongCounter requeuedMessagesCounter; public OpenTelemetryMetricsCollector(OpenTelemetry openTelemetry) { this(openTelemetry, "rabbitmq"); @@ -101,6 +101,12 @@ public OpenTelemetryMetricsCollector(final OpenTelemetry openTelemetry, final St .setDescription("The number of messages rejected from the RabbitMQ server") .build(); + // requeuedPublishedMessages + this.requeuedMessagesCounter = meter.counterBuilder(prefix + ".requeued") + .setUnit("{messages}") + .setDescription("The number of re-queued messages to the RabbitMQ server") + .build(); + // failedToPublishMessages this.failedToPublishMessagesCounter = meter.counterBuilder(prefix + ".failed_to_publish") .setUnit("{messages}") @@ -124,12 +130,6 @@ public OpenTelemetryMetricsCollector(final OpenTelemetry openTelemetry, final St .setUnit("{messages}") .setDescription("The number of un-routed published messages to the RabbitMQ server") .build(); - - // requeuedPublishedMessages - this.requeuedPublishedMessagesCounter = meter.counterBuilder(prefix + ".requeued_published") - .setUnit("{messages}") - .setDescription("The number of re-queued published messages to the RabbitMQ server") - .build(); } @Override @@ -172,10 +172,16 @@ protected void markAcknowledgedMessage() { acknowledgedMessagesCounter.add(1L, attributes); } + @Override + @SuppressWarnings("deprecation") + protected void markRejectedMessage() { + + } + @Override protected void markRejectedMessage(boolean requeue) { if (requeue) { - requeuedPublishedMessagesCounter.add(1L, attributes); + requeuedMessagesCounter.add(1L, attributes); } rejectedMessagesCounter.add(1L, attributes); } diff --git a/src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java b/src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java index 2405994461..df9c00e277 100644 --- a/src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java @@ -1,4 +1,4 @@ -// Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Java client library, is triple-licensed under the // Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 @@ -41,12 +41,11 @@ public class StandardMetricsCollector extends AbstractMetricsCollector { private final Meter consumedMessages; private final Meter acknowledgedMessages; private final Meter rejectedMessages; + private final Meter requeuedMessages; private final Meter failedToPublishMessages; private final Meter publishAcknowledgedMessages; private final Meter publishNacknowledgedMessages; private final Meter publishUnroutedMessages; - private final Meter requeuedPublishedMessages; - public StandardMetricsCollector(MetricRegistry registry, String metricsPrefix) { this.registry = registry; @@ -60,7 +59,7 @@ public StandardMetricsCollector(MetricRegistry registry, String metricsPrefix) { this.consumedMessages = registry.meter(metricsPrefix+".consumed"); this.acknowledgedMessages = registry.meter(metricsPrefix+".acknowledged"); this.rejectedMessages = registry.meter(metricsPrefix+".rejected"); - this.requeuedPublishedMessages = registry.meter(metricsPrefix+".requeued_published"); + this.requeuedMessages = registry.meter(metricsPrefix+".requeued"); } public StandardMetricsCollector() { @@ -111,8 +110,17 @@ protected void markAcknowledgedMessage() { acknowledgedMessages.mark(); } + @Override + @SuppressWarnings("deprecation") + protected void markRejectedMessage() { + + } + @Override protected void markRejectedMessage(boolean requeue) { + if (requeue) { + requeuedMessages.mark(); + } rejectedMessages.mark(); } @@ -159,6 +167,10 @@ public Meter getRejectedMessages() { return rejectedMessages; } + public Meter getRequeuedMessages() { + return this.requeuedMessages; + } + public Meter getFailedToPublishMessages() { return failedToPublishMessages; } diff --git a/src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java b/src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java index 611b412929..0858cec327 100644 --- a/src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java +++ b/src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java @@ -1,4 +1,4 @@ -// Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Java client library, is triple-licensed under the // Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2 @@ -25,7 +25,6 @@ import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import io.opentelemetry.sdk.metrics.data.LongPointData; import io.opentelemetry.sdk.metrics.data.MetricData; -import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; import io.opentelemetry.sdk.testing.junit5.OpenTelemetryExtension; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.RegisterExtension; @@ -34,6 +33,8 @@ import java.io.IOException; import java.util.List; +import java.util.function.LongConsumer; +import java.util.stream.LongStream; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; @@ -134,6 +135,54 @@ public void basicConsumeAndAck(MetricsCollectorFactory factory) { assertThat(acknowledgedMessages(metrics)).isEqualTo(1L+2L+1L); } + @ParameterizedTest + @MethodSource("data") + public void basicConsumeAndNackReject(MetricsCollectorFactory factory) { + AbstractMetricsCollector metrics = factory.create(); + Connection connection = mock(Connection.class); + when(connection.getId()).thenReturn("connection-1"); + Channel channel = mock(Channel.class); + when(channel.getConnection()).thenReturn(connection); + when(channel.getChannelNumber()).thenReturn(1); + + metrics.newConnection(connection); + metrics.newChannel(channel); + + String ctag = "1"; + metrics.basicConsume(channel, ctag, false); + + LongConsumer consumed = dtag -> metrics.consumedMessage(channel, dtag, ctag); + long count = 10; + LongStream.range(0, count).forEach(consumed::accept) ; + assertThat(consumedMessages(metrics)).isEqualTo(count); + assertThat(acknowledgedMessages(metrics)).isZero(); + + metrics.basicReject(channel, 0, false); + assertThat(acknowledgedMessages(metrics)).isZero(); + assertThat(rejectedMessages(metrics)).isEqualTo(1L); + assertThat(requeuedMessages(metrics)).isZero(); + + metrics.basicReject(channel, 1, true); + assertThat(acknowledgedMessages(metrics)).isZero(); + assertThat(rejectedMessages(metrics)).isEqualTo(2L); + assertThat(requeuedMessages(metrics)).isEqualTo(1L); + + metrics.basicNack(channel, 4, false); + assertThat(acknowledgedMessages(metrics)).isZero(); + assertThat(rejectedMessages(metrics)).isEqualTo(2L + 3L); + assertThat(requeuedMessages(metrics)).isEqualTo(1L); + + metrics.basicNack(channel, 7, true); + assertThat(acknowledgedMessages(metrics)).isZero(); + assertThat(rejectedMessages(metrics)).isEqualTo(2L + 3L + 3L); + assertThat(requeuedMessages(metrics)).isEqualTo(1L + 3L); + + metrics.basicAck(channel, 9, true); + assertThat(acknowledgedMessages(metrics)).isEqualTo(2); + assertThat(rejectedMessages(metrics)).isEqualTo(2L + 3L + 3L); + assertThat(requeuedMessages(metrics)).isEqualTo(1L + 3L); + } + @ParameterizedTest @MethodSource("data") public void publishingAndPublishingFailures(MetricsCollectorFactory factory) { @@ -400,6 +449,30 @@ else if (metrics instanceof MicrometerMetricsCollector) { } } + long rejectedMessages(MetricsCollector metrics) { + if (metrics instanceof StandardMetricsCollector) { + return ((StandardMetricsCollector) metrics).getRejectedMessages().getCount(); + } + else if (metrics instanceof MicrometerMetricsCollector) { + return (long)((MicrometerMetricsCollector) metrics).getRejectedMessages().count(); + } + else { + return getOpenTelemetryCounterMeterValue("rabbitmq.rejected"); + } + } + + long requeuedMessages(MetricsCollector metrics) { + if (metrics instanceof StandardMetricsCollector) { + return ((StandardMetricsCollector) metrics).getRequeuedMessages().getCount(); + } + else if (metrics instanceof MicrometerMetricsCollector) { + return (long)((MicrometerMetricsCollector) metrics).getRequeuedMessages().count(); + } + else { + return getOpenTelemetryCounterMeterValue("rabbitmq.requeued"); + } + } + long connections(MetricsCollector metrics) { if (metrics instanceof StandardMetricsCollector) { return ((StandardMetricsCollector) metrics).getConnections().getCount();