diff --git a/pom.xml b/pom.xml
index 6b734d3425..52a0329e34 100644
--- a/pom.xml
+++ b/pom.xml
@@ -57,14 +57,14 @@
true
1.7.36
4.2.28
- 1.13.6
- 1.43.0
+ 1.13.7
+ 1.44.1
2.18.1
1.2.13
5.11.3
5.14.2
3.26.3
- 1.3.5
+ 1.3.6
1.0.4
9.4.56.v20240826
1.79
diff --git a/src/main/java/com/rabbitmq/client/MetricsCollector.java b/src/main/java/com/rabbitmq/client/MetricsCollector.java
index d6905c38f8..812574c4d1 100644
--- a/src/main/java/com/rabbitmq/client/MetricsCollector.java
+++ b/src/main/java/com/rabbitmq/client/MetricsCollector.java
@@ -1,4 +1,4 @@
-// Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
+// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -60,8 +60,16 @@ default void basicPublishUnrouted(Channel channel) {
void basicNack(Channel channel, long deliveryTag);
+ default void basicNack(Channel channel, long deliveryTag, boolean requeue) {
+ this.basicNack(channel, deliveryTag);
+ }
+
void basicReject(Channel channel, long deliveryTag);
+ default void basicReject(Channel channel, long deliveryTag, boolean requeue) {
+ this.basicReject(channel, deliveryTag);
+ }
+
void basicConsume(Channel channel, String consumerTag, boolean autoAck);
void basicCancel(Channel channel, String consumerTag);
diff --git a/src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java b/src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java
index cb65674e33..0b703f0edd 100644
--- a/src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java
+++ b/src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java
@@ -1,4 +1,4 @@
-// Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
+// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -50,11 +50,21 @@ public void basicNack(Channel channel, long deliveryTag) {
}
+ @Override
+ public void basicNack(Channel channel, long deliveryTag, boolean requeue) {
+
+ }
+
@Override
public void basicReject(Channel channel, long deliveryTag) {
}
+ @Override
+ public void basicReject(Channel channel, long deliveryTag, boolean requeue) {
+
+ }
+
@Override
public void basicConsume(Channel channel, String consumerTag, boolean autoAck) {
diff --git a/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java b/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java
index ca3132f1ab..74df1d72b7 100644
--- a/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java
+++ b/src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java
@@ -1,4 +1,4 @@
-// Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
+// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -43,7 +43,7 @@ public abstract class AbstractMetricsCollector implements MetricsCollector {
private final Runnable markAcknowledgedMessageAction = () -> markAcknowledgedMessage();
- private final Runnable markRejectedMessageAction = () -> markRejectedMessage();
+ private final Function markRejectedMessageAction;
private final Runnable markMessagePublishAcknowledgedAction = () -> markMessagePublishAcknowledged();
@@ -53,6 +53,12 @@ public abstract class AbstractMetricsCollector implements MetricsCollector {
private static final Function> GET_UNCONFIRMED_DTAGS = channelState -> channelState.unconfirmedMessageDeliveryTags;
+ public AbstractMetricsCollector() {
+ Runnable rejectRequeue = () -> markRejectedMessage(true);
+ Runnable rejectNoRequeue = () -> markRejectedMessage(false);
+ this.markRejectedMessageAction = requeue -> requeue ? rejectRequeue : rejectNoRequeue;
+ }
+
@Override
public void newConnection(final Connection connection) {
try {
@@ -237,8 +243,13 @@ public void basicAck(Channel channel, long deliveryTag, boolean multiple) {
@Override
public void basicNack(Channel channel, long deliveryTag) {
+ // replaced by #basicNack(Channel, long, boolean)
+ }
+
+ @Override
+ public void basicNack(Channel channel, long deliveryTag, boolean requeue) {
try {
- updateChannelStateAfterAckReject(channel, deliveryTag, true, GET_UNACKED_DTAGS, markRejectedMessageAction);
+ updateChannelStateAfterAckReject(channel, deliveryTag, true, GET_UNACKED_DTAGS, markRejectedMessageAction.apply(requeue));
} catch(Exception e) {
LOGGER.info("Error while computing metrics in basicNack: " + e.getMessage());
}
@@ -246,8 +257,13 @@ public void basicNack(Channel channel, long deliveryTag) {
@Override
public void basicReject(Channel channel, long deliveryTag) {
+ // replaced by #basicReject(Channel, long, boolean)
+ }
+
+ @Override
+ public void basicReject(Channel channel, long deliveryTag, boolean requeue) {
try {
- updateChannelStateAfterAckReject(channel, deliveryTag, false, GET_UNACKED_DTAGS, markRejectedMessageAction);
+ updateChannelStateAfterAckReject(channel, deliveryTag, false, GET_UNACKED_DTAGS, markRejectedMessageAction.apply(requeue));
} catch(Exception e) {
LOGGER.info("Error while computing metrics in basicReject: " + e.getMessage());
}
@@ -408,9 +424,18 @@ private ChannelState(Channel channel) {
/**
* Marks the event of a rejected message.
+ *
+ * @deprecated Use {@link #markRejectedMessage(boolean)} instead
*/
protected abstract void markRejectedMessage();
+ /**
+ * Marks the event of a rejected message.
+ */
+ protected void markRejectedMessage(boolean requeue) {
+ this.markRejectedMessage();
+ }
+
/**
* Marks the event of a message publishing acknowledgement.
*/
diff --git a/src/main/java/com/rabbitmq/client/impl/ChannelN.java b/src/main/java/com/rabbitmq/client/impl/ChannelN.java
index 8b24f5e43a..03c68caab3 100644
--- a/src/main/java/com/rabbitmq/client/impl/ChannelN.java
+++ b/src/main/java/com/rabbitmq/client/impl/ChannelN.java
@@ -1213,7 +1213,7 @@ public void basicNack(long deliveryTag, boolean multiple, boolean requeue)
throws IOException
{
transmit(new Basic.Nack(deliveryTag, multiple, requeue));
- metricsCollector.basicNack(this, deliveryTag);
+ metricsCollector.basicNack(this, deliveryTag, requeue);
}
/** Public API - {@inheritDoc} */
@@ -1222,7 +1222,7 @@ public void basicReject(long deliveryTag, boolean requeue)
throws IOException
{
transmit(new Basic.Reject(deliveryTag, requeue));
- metricsCollector.basicReject(this, deliveryTag);
+ metricsCollector.basicReject(this, deliveryTag, requeue);
}
/** Public API - {@inheritDoc} */
diff --git a/src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java b/src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java
index 7cbf73c9a9..5ebf3389d2 100644
--- a/src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java
+++ b/src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java
@@ -63,6 +63,8 @@ public class MicrometerMetricsCollector extends AbstractMetricsCollector {
private final Counter rejectedMessages;
+ private final Counter requeuedMessages;
+
public MicrometerMetricsCollector(MeterRegistry registry) {
this(registry, "rabbitmq");
}
@@ -90,6 +92,7 @@ public MicrometerMetricsCollector(Function metricsCreator) {
this.ackedPublishedMessages = (Counter) metricsCreator.apply(ACKED_PUBLISHED_MESSAGES);
this.nackedPublishedMessages = (Counter) metricsCreator.apply(NACKED_PUBLISHED_MESSAGES);
this.unroutedPublishedMessages = (Counter) metricsCreator.apply(UNROUTED_PUBLISHED_MESSAGES);
+ this.requeuedMessages = (Counter) metricsCreator.apply(REQUEUED_MESSAGES);
}
@Override
@@ -133,7 +136,16 @@ protected void markAcknowledgedMessage() {
}
@Override
+ @SuppressWarnings("deprecation")
protected void markRejectedMessage() {
+
+ }
+
+ @Override
+ protected void markRejectedMessage(boolean requeue) {
+ if (requeue) {
+ requeuedMessages.increment();
+ }
rejectedMessages.increment();
}
@@ -192,6 +204,10 @@ public Counter getRejectedMessages() {
return rejectedMessages;
}
+ public Counter getRequeuedMessages() {
+ return requeuedMessages;
+ }
+
public enum Metrics {
CONNECTIONS {
@Override
@@ -229,6 +245,12 @@ Object create(MeterRegistry registry, String prefix, Iterable tags) {
return registry.counter(prefix + ".rejected", tags);
}
},
+ REQUEUED_MESSAGES {
+ @Override
+ Object create(MeterRegistry registry, String prefix, Iterable tags) {
+ return registry.counter(prefix + ".requeued", tags);
+ }
+ },
FAILED_TO_PUBLISH_MESSAGES {
@Override
Object create(MeterRegistry registry, String prefix, Iterable tags) {
diff --git a/src/main/java/com/rabbitmq/client/impl/OpenTelemetryMetricsCollector.java b/src/main/java/com/rabbitmq/client/impl/OpenTelemetryMetricsCollector.java
index d3cc4067da..404ac5a9b0 100644
--- a/src/main/java/com/rabbitmq/client/impl/OpenTelemetryMetricsCollector.java
+++ b/src/main/java/com/rabbitmq/client/impl/OpenTelemetryMetricsCollector.java
@@ -1,4 +1,4 @@
-// Copyright (c) 2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
+// Copyright (c) 2023-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -46,6 +46,7 @@ public class OpenTelemetryMetricsCollector extends AbstractMetricsCollector {
private final LongCounter ackedPublishedMessagesCounter;
private final LongCounter nackedPublishedMessagesCounter;
private final LongCounter unroutedPublishedMessagesCounter;
+ private final LongCounter requeuedMessagesCounter;
public OpenTelemetryMetricsCollector(OpenTelemetry openTelemetry) {
this(openTelemetry, "rabbitmq");
@@ -100,6 +101,12 @@ public OpenTelemetryMetricsCollector(final OpenTelemetry openTelemetry, final St
.setDescription("The number of messages rejected from the RabbitMQ server")
.build();
+ // requeuedPublishedMessages
+ this.requeuedMessagesCounter = meter.counterBuilder(prefix + ".requeued")
+ .setUnit("{messages}")
+ .setDescription("The number of re-queued messages to the RabbitMQ server")
+ .build();
+
// failedToPublishMessages
this.failedToPublishMessagesCounter = meter.counterBuilder(prefix + ".failed_to_publish")
.setUnit("{messages}")
@@ -166,7 +173,16 @@ protected void markAcknowledgedMessage() {
}
@Override
+ @SuppressWarnings("deprecation")
protected void markRejectedMessage() {
+
+ }
+
+ @Override
+ protected void markRejectedMessage(boolean requeue) {
+ if (requeue) {
+ requeuedMessagesCounter.add(1L, attributes);
+ }
rejectedMessagesCounter.add(1L, attributes);
}
diff --git a/src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java b/src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java
index a4ddfc24f4..df9c00e277 100644
--- a/src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java
+++ b/src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java
@@ -1,4 +1,4 @@
-// Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
+// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -41,12 +41,12 @@ public class StandardMetricsCollector extends AbstractMetricsCollector {
private final Meter consumedMessages;
private final Meter acknowledgedMessages;
private final Meter rejectedMessages;
+ private final Meter requeuedMessages;
private final Meter failedToPublishMessages;
private final Meter publishAcknowledgedMessages;
private final Meter publishNacknowledgedMessages;
private final Meter publishUnroutedMessages;
-
public StandardMetricsCollector(MetricRegistry registry, String metricsPrefix) {
this.registry = registry;
this.connections = registry.counter(metricsPrefix+".connections");
@@ -59,6 +59,7 @@ public StandardMetricsCollector(MetricRegistry registry, String metricsPrefix) {
this.consumedMessages = registry.meter(metricsPrefix+".consumed");
this.acknowledgedMessages = registry.meter(metricsPrefix+".acknowledged");
this.rejectedMessages = registry.meter(metricsPrefix+".rejected");
+ this.requeuedMessages = registry.meter(metricsPrefix+".requeued");
}
public StandardMetricsCollector() {
@@ -110,7 +111,16 @@ protected void markAcknowledgedMessage() {
}
@Override
+ @SuppressWarnings("deprecation")
protected void markRejectedMessage() {
+
+ }
+
+ @Override
+ protected void markRejectedMessage(boolean requeue) {
+ if (requeue) {
+ requeuedMessages.mark();
+ }
rejectedMessages.mark();
}
@@ -157,6 +167,10 @@ public Meter getRejectedMessages() {
return rejectedMessages;
}
+ public Meter getRequeuedMessages() {
+ return this.requeuedMessages;
+ }
+
public Meter getFailedToPublishMessages() {
return failedToPublishMessages;
}
diff --git a/src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareChannelN.java b/src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareChannelN.java
index ccfa59be00..c1c9cb9e4b 100644
--- a/src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareChannelN.java
+++ b/src/main/java/com/rabbitmq/client/impl/recovery/RecoveryAwareChannelN.java
@@ -126,7 +126,7 @@ public void basicNack(long deliveryTag, boolean multiple, boolean requeue) throw
return;
}
transmit(new Basic.Nack(realTag, multiple, requeue));
- metricsCollector.basicNack(this, deliveryTag);
+ metricsCollector.basicNack(this, deliveryTag, requeue);
}
@Override
@@ -137,7 +137,7 @@ public void basicReject(long deliveryTag, boolean requeue) throws IOException {
long realTag = deliveryTag - activeDeliveryTagOffset;
if (realTag > 0) {
transmit(new Basic.Reject(realTag, requeue));
- metricsCollector.basicReject(this, deliveryTag);
+ metricsCollector.basicReject(this, deliveryTag, requeue);
}
}
diff --git a/src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java b/src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java
index 611b412929..0858cec327 100644
--- a/src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java
+++ b/src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java
@@ -1,4 +1,4 @@
-// Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
+// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Java client library, is triple-licensed under the
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -25,7 +25,6 @@
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import io.opentelemetry.sdk.metrics.data.LongPointData;
import io.opentelemetry.sdk.metrics.data.MetricData;
-import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
import io.opentelemetry.sdk.testing.junit5.OpenTelemetryExtension;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -34,6 +33,8 @@
import java.io.IOException;
import java.util.List;
+import java.util.function.LongConsumer;
+import java.util.stream.LongStream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
@@ -134,6 +135,54 @@ public void basicConsumeAndAck(MetricsCollectorFactory factory) {
assertThat(acknowledgedMessages(metrics)).isEqualTo(1L+2L+1L);
}
+ @ParameterizedTest
+ @MethodSource("data")
+ public void basicConsumeAndNackReject(MetricsCollectorFactory factory) {
+ AbstractMetricsCollector metrics = factory.create();
+ Connection connection = mock(Connection.class);
+ when(connection.getId()).thenReturn("connection-1");
+ Channel channel = mock(Channel.class);
+ when(channel.getConnection()).thenReturn(connection);
+ when(channel.getChannelNumber()).thenReturn(1);
+
+ metrics.newConnection(connection);
+ metrics.newChannel(channel);
+
+ String ctag = "1";
+ metrics.basicConsume(channel, ctag, false);
+
+ LongConsumer consumed = dtag -> metrics.consumedMessage(channel, dtag, ctag);
+ long count = 10;
+ LongStream.range(0, count).forEach(consumed::accept) ;
+ assertThat(consumedMessages(metrics)).isEqualTo(count);
+ assertThat(acknowledgedMessages(metrics)).isZero();
+
+ metrics.basicReject(channel, 0, false);
+ assertThat(acknowledgedMessages(metrics)).isZero();
+ assertThat(rejectedMessages(metrics)).isEqualTo(1L);
+ assertThat(requeuedMessages(metrics)).isZero();
+
+ metrics.basicReject(channel, 1, true);
+ assertThat(acknowledgedMessages(metrics)).isZero();
+ assertThat(rejectedMessages(metrics)).isEqualTo(2L);
+ assertThat(requeuedMessages(metrics)).isEqualTo(1L);
+
+ metrics.basicNack(channel, 4, false);
+ assertThat(acknowledgedMessages(metrics)).isZero();
+ assertThat(rejectedMessages(metrics)).isEqualTo(2L + 3L);
+ assertThat(requeuedMessages(metrics)).isEqualTo(1L);
+
+ metrics.basicNack(channel, 7, true);
+ assertThat(acknowledgedMessages(metrics)).isZero();
+ assertThat(rejectedMessages(metrics)).isEqualTo(2L + 3L + 3L);
+ assertThat(requeuedMessages(metrics)).isEqualTo(1L + 3L);
+
+ metrics.basicAck(channel, 9, true);
+ assertThat(acknowledgedMessages(metrics)).isEqualTo(2);
+ assertThat(rejectedMessages(metrics)).isEqualTo(2L + 3L + 3L);
+ assertThat(requeuedMessages(metrics)).isEqualTo(1L + 3L);
+ }
+
@ParameterizedTest
@MethodSource("data")
public void publishingAndPublishingFailures(MetricsCollectorFactory factory) {
@@ -400,6 +449,30 @@ else if (metrics instanceof MicrometerMetricsCollector) {
}
}
+ long rejectedMessages(MetricsCollector metrics) {
+ if (metrics instanceof StandardMetricsCollector) {
+ return ((StandardMetricsCollector) metrics).getRejectedMessages().getCount();
+ }
+ else if (metrics instanceof MicrometerMetricsCollector) {
+ return (long)((MicrometerMetricsCollector) metrics).getRejectedMessages().count();
+ }
+ else {
+ return getOpenTelemetryCounterMeterValue("rabbitmq.rejected");
+ }
+ }
+
+ long requeuedMessages(MetricsCollector metrics) {
+ if (metrics instanceof StandardMetricsCollector) {
+ return ((StandardMetricsCollector) metrics).getRequeuedMessages().getCount();
+ }
+ else if (metrics instanceof MicrometerMetricsCollector) {
+ return (long)((MicrometerMetricsCollector) metrics).getRequeuedMessages().count();
+ }
+ else {
+ return getOpenTelemetryCounterMeterValue("rabbitmq.requeued");
+ }
+ }
+
long connections(MetricsCollector metrics) {
if (metrics instanceof StandardMetricsCollector) {
return ((StandardMetricsCollector) metrics).getConnections().getCount();