Skip to content

Commit 28c7d51

Browse files
committed
Polish support for new "requeued" count metric
Ensure backward compatibility, use "requeued" name, update copyright year, add test.
1 parent 1d0b4cb commit 28c7d51

File tree

7 files changed

+175
-31
lines changed

7 files changed

+175
-31
lines changed

src/main/java/com/rabbitmq/client/MetricsCollector.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
1+
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -58,9 +58,17 @@ default void basicPublishUnrouted(Channel channel) {
5858

5959
void basicAck(Channel channel, long deliveryTag, boolean multiple);
6060

61-
void basicNack(Channel channel, long deliveryTag, boolean requeue);
61+
void basicNack(Channel channel, long deliveryTag);
6262

63-
void basicReject(Channel channel, long deliveryTag, boolean requeue);
63+
default void basicNack(Channel channel, long deliveryTag, boolean requeue) {
64+
this.basicNack(channel, deliveryTag);
65+
}
66+
67+
void basicReject(Channel channel, long deliveryTag);
68+
69+
default void basicReject(Channel channel, long deliveryTag, boolean requeue) {
70+
this.basicReject(channel, deliveryTag);
71+
}
6472

6573
void basicConsume(Channel channel, String consumerTag, boolean autoAck);
6674

src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
1+
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -45,11 +45,21 @@ public void basicAck(Channel channel, long deliveryTag, boolean multiple) {
4545

4646
}
4747

48+
@Override
49+
public void basicNack(Channel channel, long deliveryTag) {
50+
51+
}
52+
4853
@Override
4954
public void basicNack(Channel channel, long deliveryTag, boolean requeue) {
5055

5156
}
5257

58+
@Override
59+
public void basicReject(Channel channel, long deliveryTag) {
60+
61+
}
62+
5363
@Override
5464
public void basicReject(Channel channel, long deliveryTag, boolean requeue) {
5565

src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
1+
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -43,7 +43,7 @@ public abstract class AbstractMetricsCollector implements MetricsCollector {
4343

4444
private final Runnable markAcknowledgedMessageAction = () -> markAcknowledgedMessage();
4545

46-
private final Function<Boolean, Runnable> markRejectedMessageAction = requeue -> () -> markRejectedMessage(requeue);
46+
private final Function<Boolean, Runnable> markRejectedMessageAction;
4747

4848
private final Runnable markMessagePublishAcknowledgedAction = () -> markMessagePublishAcknowledged();
4949

@@ -53,6 +53,12 @@ public abstract class AbstractMetricsCollector implements MetricsCollector {
5353

5454
private static final Function<ChannelState, Set<Long>> GET_UNCONFIRMED_DTAGS = channelState -> channelState.unconfirmedMessageDeliveryTags;
5555

56+
public AbstractMetricsCollector() {
57+
Runnable rejectRequeue = () -> markRejectedMessage(true);
58+
Runnable rejectNoRequeue = () -> markRejectedMessage(false);
59+
this.markRejectedMessageAction = requeue -> requeue ? rejectRequeue : rejectNoRequeue;
60+
}
61+
5662
@Override
5763
public void newConnection(final Connection connection) {
5864
try {
@@ -235,6 +241,11 @@ public void basicAck(Channel channel, long deliveryTag, boolean multiple) {
235241
}
236242
}
237243

244+
@Override
245+
public void basicNack(Channel channel, long deliveryTag) {
246+
// replaced by #basicNack(Channel, long, boolean)
247+
}
248+
238249
@Override
239250
public void basicNack(Channel channel, long deliveryTag, boolean requeue) {
240251
try {
@@ -244,6 +255,11 @@ public void basicNack(Channel channel, long deliveryTag, boolean requeue) {
244255
}
245256
}
246257

258+
@Override
259+
public void basicReject(Channel channel, long deliveryTag) {
260+
// replaced by #basicReject(Channel, long, boolean)
261+
}
262+
247263
@Override
248264
public void basicReject(Channel channel, long deliveryTag, boolean requeue) {
249265
try {
@@ -406,10 +422,19 @@ private ChannelState(Channel channel) {
406422
*/
407423
protected abstract void markAcknowledgedMessage();
408424

425+
/**
426+
* Marks the event of a rejected message.
427+
*
428+
* @deprecated Use {@link #markRejectedMessage(boolean)} instead
429+
*/
430+
protected abstract void markRejectedMessage();
431+
409432
/**
410433
* Marks the event of a rejected message.
411434
*/
412-
protected abstract void markRejectedMessage(boolean requeue);
435+
protected void markRejectedMessage(boolean requeue) {
436+
this.markRejectedMessage();
437+
}
413438

414439
/**
415440
* Marks the event of a message publishing acknowledgement.

src/main/java/com/rabbitmq/client/impl/MicrometerMetricsCollector.java

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public class MicrometerMetricsCollector extends AbstractMetricsCollector {
6363

6464
private final Counter rejectedMessages;
6565

66-
private final Counter requeuedPublishedMessages;
66+
private final Counter requeuedMessages;
6767

6868
public MicrometerMetricsCollector(MeterRegistry registry) {
6969
this(registry, "rabbitmq");
@@ -92,7 +92,7 @@ public MicrometerMetricsCollector(Function<Metrics, Object> metricsCreator) {
9292
this.ackedPublishedMessages = (Counter) metricsCreator.apply(ACKED_PUBLISHED_MESSAGES);
9393
this.nackedPublishedMessages = (Counter) metricsCreator.apply(NACKED_PUBLISHED_MESSAGES);
9494
this.unroutedPublishedMessages = (Counter) metricsCreator.apply(UNROUTED_PUBLISHED_MESSAGES);
95-
this.requeuedPublishedMessages = (Counter) metricsCreator.apply(REQUEUED_PUBLISHED_MESSAGES);
95+
this.requeuedMessages = (Counter) metricsCreator.apply(REQUEUED_MESSAGES);
9696
}
9797

9898
@Override
@@ -135,10 +135,16 @@ protected void markAcknowledgedMessage() {
135135
acknowledgedMessages.increment();
136136
}
137137

138+
@Override
139+
@SuppressWarnings("deprecation")
140+
protected void markRejectedMessage() {
141+
142+
}
143+
138144
@Override
139145
protected void markRejectedMessage(boolean requeue) {
140146
if (requeue) {
141-
requeuedPublishedMessages.increment();
147+
requeuedMessages.increment();
142148
}
143149
rejectedMessages.increment();
144150
}
@@ -198,6 +204,10 @@ public Counter getRejectedMessages() {
198204
return rejectedMessages;
199205
}
200206

207+
public Counter getRequeuedMessages() {
208+
return requeuedMessages;
209+
}
210+
201211
public enum Metrics {
202212
CONNECTIONS {
203213
@Override
@@ -235,6 +245,12 @@ Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
235245
return registry.counter(prefix + ".rejected", tags);
236246
}
237247
},
248+
REQUEUED_MESSAGES {
249+
@Override
250+
Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
251+
return registry.counter(prefix + ".requeued", tags);
252+
}
253+
},
238254
FAILED_TO_PUBLISH_MESSAGES {
239255
@Override
240256
Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
@@ -258,12 +274,6 @@ Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
258274
Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
259275
return registry.counter(prefix + ".unrouted_published", tags);
260276
}
261-
},
262-
REQUEUED_PUBLISHED_MESSAGES {
263-
@Override
264-
Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags) {
265-
return registry.counter(prefix + ".requeued_published", tags);
266-
}
267277
};
268278

269279
abstract Object create(MeterRegistry registry, String prefix, Iterable<Tag> tags);

src/main/java/com/rabbitmq/client/impl/OpenTelemetryMetricsCollector.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
1+
// Copyright (c) 2023-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -46,7 +46,7 @@ public class OpenTelemetryMetricsCollector extends AbstractMetricsCollector {
4646
private final LongCounter ackedPublishedMessagesCounter;
4747
private final LongCounter nackedPublishedMessagesCounter;
4848
private final LongCounter unroutedPublishedMessagesCounter;
49-
private final LongCounter requeuedPublishedMessagesCounter;
49+
private final LongCounter requeuedMessagesCounter;
5050

5151
public OpenTelemetryMetricsCollector(OpenTelemetry openTelemetry) {
5252
this(openTelemetry, "rabbitmq");
@@ -101,6 +101,12 @@ public OpenTelemetryMetricsCollector(final OpenTelemetry openTelemetry, final St
101101
.setDescription("The number of messages rejected from the RabbitMQ server")
102102
.build();
103103

104+
// requeuedPublishedMessages
105+
this.requeuedMessagesCounter = meter.counterBuilder(prefix + ".requeued")
106+
.setUnit("{messages}")
107+
.setDescription("The number of re-queued messages to the RabbitMQ server")
108+
.build();
109+
104110
// failedToPublishMessages
105111
this.failedToPublishMessagesCounter = meter.counterBuilder(prefix + ".failed_to_publish")
106112
.setUnit("{messages}")
@@ -124,12 +130,6 @@ public OpenTelemetryMetricsCollector(final OpenTelemetry openTelemetry, final St
124130
.setUnit("{messages}")
125131
.setDescription("The number of un-routed published messages to the RabbitMQ server")
126132
.build();
127-
128-
// requeuedPublishedMessages
129-
this.requeuedPublishedMessagesCounter = meter.counterBuilder(prefix + ".requeued_published")
130-
.setUnit("{messages}")
131-
.setDescription("The number of re-queued published messages to the RabbitMQ server")
132-
.build();
133133
}
134134

135135
@Override
@@ -172,10 +172,16 @@ protected void markAcknowledgedMessage() {
172172
acknowledgedMessagesCounter.add(1L, attributes);
173173
}
174174

175+
@Override
176+
@SuppressWarnings("deprecation")
177+
protected void markRejectedMessage() {
178+
179+
}
180+
175181
@Override
176182
protected void markRejectedMessage(boolean requeue) {
177183
if (requeue) {
178-
requeuedPublishedMessagesCounter.add(1L, attributes);
184+
requeuedMessagesCounter.add(1L, attributes);
179185
}
180186
rejectedMessagesCounter.add(1L, attributes);
181187
}

src/main/java/com/rabbitmq/client/impl/StandardMetricsCollector.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2007-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
1+
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
22
//
33
// This software, the RabbitMQ Java client library, is triple-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), the GNU General Public License version 2
@@ -41,12 +41,11 @@ public class StandardMetricsCollector extends AbstractMetricsCollector {
4141
private final Meter consumedMessages;
4242
private final Meter acknowledgedMessages;
4343
private final Meter rejectedMessages;
44+
private final Meter requeuedMessages;
4445
private final Meter failedToPublishMessages;
4546
private final Meter publishAcknowledgedMessages;
4647
private final Meter publishNacknowledgedMessages;
4748
private final Meter publishUnroutedMessages;
48-
private final Meter requeuedPublishedMessages;
49-
5049

5150
public StandardMetricsCollector(MetricRegistry registry, String metricsPrefix) {
5251
this.registry = registry;
@@ -60,7 +59,7 @@ public StandardMetricsCollector(MetricRegistry registry, String metricsPrefix) {
6059
this.consumedMessages = registry.meter(metricsPrefix+".consumed");
6160
this.acknowledgedMessages = registry.meter(metricsPrefix+".acknowledged");
6261
this.rejectedMessages = registry.meter(metricsPrefix+".rejected");
63-
this.requeuedPublishedMessages = registry.meter(metricsPrefix+".requeued_published");
62+
this.requeuedMessages = registry.meter(metricsPrefix+".requeued");
6463
}
6564

6665
public StandardMetricsCollector() {
@@ -111,8 +110,17 @@ protected void markAcknowledgedMessage() {
111110
acknowledgedMessages.mark();
112111
}
113112

113+
@Override
114+
@SuppressWarnings("deprecation")
115+
protected void markRejectedMessage() {
116+
117+
}
118+
114119
@Override
115120
protected void markRejectedMessage(boolean requeue) {
121+
if (requeue) {
122+
requeuedMessages.mark();
123+
}
116124
rejectedMessages.mark();
117125
}
118126

@@ -159,6 +167,10 @@ public Meter getRejectedMessages() {
159167
return rejectedMessages;
160168
}
161169

170+
public Meter getRequeuedMessages() {
171+
return this.requeuedMessages;
172+
}
173+
162174
public Meter getFailedToPublishMessages() {
163175
return failedToPublishMessages;
164176
}

0 commit comments

Comments
 (0)