Skip to content

Commit 28f507a

Browse files
committed
Handle multiple publisher confirms in metrics
This is follow-up to #354. Fixes #372
1 parent 8bc24bd commit 28f507a

File tree

5 files changed

+102
-38
lines changed

5 files changed

+102
-38
lines changed

src/main/java/com/rabbitmq/client/MetricsCollector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public interface MetricsCollector {
3434

3535
void closeChannel(Channel channel);
3636

37-
void basicPublish(Channel channel);
37+
void basicPublish(Channel channel, long deliveryTag);
3838

3939
void basicPublishFailure(Channel channel, Throwable cause);
4040

src/main/java/com/rabbitmq/client/NoOpMetricsCollector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public void basicCancel(Channel channel, String consumerTag) {
6666
}
6767

6868
@Override
69-
public void basicPublish(Channel channel) {
69+
public void basicPublish(Channel channel, long deliveryTag) {
7070

7171
}
7272

src/main/java/com/rabbitmq/client/impl/AbstractMetricsCollector.java

Lines changed: 32 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.concurrent.ConcurrentMap;
2525
import java.util.concurrent.locks.Lock;
2626
import java.util.concurrent.locks.ReentrantLock;
27+
import java.util.function.Function;
2728

2829
/**
2930
* Base class for {@link MetricsCollector}.
@@ -44,6 +45,14 @@ public abstract class AbstractMetricsCollector implements MetricsCollector {
4445

4546
private final Runnable markRejectedMessageAction = () -> markRejectedMessage();
4647

48+
private final Runnable markMessagePublishAcknowledgedAction = () -> markMessagePublishAcknowledged();
49+
50+
private final Runnable markMessagePublishNotAcknowledgedAction = () -> markMessagePublishNotAcknowledged();
51+
52+
private static final Function<ChannelState, Set<Long>> GET_UNACKED_DTAGS = channelState -> channelState.unackedMessageDeliveryTags;
53+
54+
private static final Function<ChannelState, Set<Long>> GET_UNCONFIRMED_DTAGS = channelState -> channelState.unconfirmedMessageDeliveryTags;
55+
4756
@Override
4857
public void newConnection(final Connection connection) {
4958
try {
@@ -94,8 +103,17 @@ public void closeChannel(Channel channel) {
94103
}
95104

96105
@Override
97-
public void basicPublish(Channel channel) {
106+
public void basicPublish(Channel channel, long deliveryTag) {
98107
try {
108+
if (deliveryTag != 0) {
109+
ChannelState channelState = channelState(channel);
110+
channelState.lock.lock();
111+
try {
112+
channelState(channel).unconfirmedMessageDeliveryTags.add(deliveryTag);
113+
} finally {
114+
channelState.lock.unlock();
115+
}
116+
}
99117
markPublishedMessage();
100118
} catch(Exception e) {
101119
LOGGER.info("Error while computing metrics in basicPublish: " + e.getMessage());
@@ -113,26 +131,19 @@ public void basicPublishFailure(Channel channel, Throwable cause) {
113131

114132
@Override
115133
public void basicPublishAck(Channel channel, long deliveryTag, boolean multiple) {
116-
if (multiple) {
117-
// this is a naive approach, as it does not handle multiple nacks
118-
return;
119-
}
120134
try {
121-
markMessagePublishAcknowledged();
122-
} catch(Exception e) {
135+
updateChannelStateAfterAckReject(channel, deliveryTag, multiple, GET_UNCONFIRMED_DTAGS, markMessagePublishAcknowledgedAction);
136+
} catch (Exception e) {
137+
e.printStackTrace();
123138
LOGGER.info("Error while computing metrics in basicPublishAck: " + e.getMessage());
124139
}
125140
}
126141

127142
@Override
128143
public void basicPublishNack(Channel channel, long deliveryTag, boolean multiple) {
129-
if (multiple) {
130-
// this is a naive approach, as it does not handle multiple nacks
131-
return;
132-
}
133144
try {
134-
markMessagePublishNotAcknowledged();
135-
} catch(Exception e) {
145+
updateChannelStateAfterAckReject(channel, deliveryTag, multiple, GET_UNCONFIRMED_DTAGS, markMessagePublishNotAcknowledgedAction);
146+
} catch (Exception e) {
136147
LOGGER.info("Error while computing metrics in basicPublishNack: " + e.getMessage());
137148
}
138149
}
@@ -217,7 +228,7 @@ public void consumedMessage(Channel channel, long deliveryTag, String consumerTa
217228
@Override
218229
public void basicAck(Channel channel, long deliveryTag, boolean multiple) {
219230
try {
220-
updateChannelStateAfterAckReject(channel, deliveryTag, multiple, markAcknowledgedMessageAction);
231+
updateChannelStateAfterAckReject(channel, deliveryTag, multiple, GET_UNACKED_DTAGS, markAcknowledgedMessageAction);
221232
} catch(Exception e) {
222233
LOGGER.info("Error while computing metrics in basicAck: " + e.getMessage());
223234
}
@@ -226,7 +237,7 @@ public void basicAck(Channel channel, long deliveryTag, boolean multiple) {
226237
@Override
227238
public void basicNack(Channel channel, long deliveryTag) {
228239
try {
229-
updateChannelStateAfterAckReject(channel, deliveryTag, true, markRejectedMessageAction);
240+
updateChannelStateAfterAckReject(channel, deliveryTag, true, GET_UNACKED_DTAGS, markRejectedMessageAction);
230241
} catch(Exception e) {
231242
LOGGER.info("Error while computing metrics in basicNack: " + e.getMessage());
232243
}
@@ -235,18 +246,19 @@ public void basicNack(Channel channel, long deliveryTag) {
235246
@Override
236247
public void basicReject(Channel channel, long deliveryTag) {
237248
try {
238-
updateChannelStateAfterAckReject(channel, deliveryTag, false, markRejectedMessageAction);
249+
updateChannelStateAfterAckReject(channel, deliveryTag, false, GET_UNACKED_DTAGS, markRejectedMessageAction);
239250
} catch(Exception e) {
240251
LOGGER.info("Error while computing metrics in basicReject: " + e.getMessage());
241252
}
242253
}
243254

244-
private void updateChannelStateAfterAckReject(Channel channel, long deliveryTag, boolean multiple, Runnable action) {
255+
private void updateChannelStateAfterAckReject(Channel channel, long deliveryTag, boolean multiple,
256+
Function<ChannelState, Set<Long>> dtags, Runnable action) {
245257
ChannelState channelState = channelState(channel);
246258
channelState.lock.lock();
247259
try {
248260
if(multiple) {
249-
Iterator<Long> iterator = channelState.unackedMessageDeliveryTags.iterator();
261+
Iterator<Long> iterator = dtags.apply(channelState).iterator();
250262
while(iterator.hasNext()) {
251263
long messageDeliveryTag = iterator.next();
252264
if(messageDeliveryTag <= deliveryTag) {
@@ -255,7 +267,7 @@ private void updateChannelStateAfterAckReject(Channel channel, long deliveryTag,
255267
}
256268
}
257269
} else {
258-
if (channelState.unackedMessageDeliveryTags.remove(deliveryTag)) {
270+
if (dtags.apply(channelState).remove(deliveryTag)) {
259271
action.run();
260272
}
261273
}
@@ -329,6 +341,7 @@ private static class ChannelState {
329341

330342
final Set<Long> unackedMessageDeliveryTags = new HashSet<Long>();
331343
final Set<String> consumersWithManualAck = new HashSet<String>();
344+
final Set<Long> unconfirmedMessageDeliveryTags = new HashSet<>();
332345

333346
final Channel channel;
334347

src/main/java/com/rabbitmq/client/impl/ChannelN.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -692,9 +692,13 @@ public void basicPublish(String exchange, String routingKey,
692692
BasicProperties props, byte[] body)
693693
throws IOException
694694
{
695+
final long deliveryTag;
695696
if (nextPublishSeqNo > 0) {
696-
unconfirmedSet.add(getNextPublishSeqNo());
697+
deliveryTag = getNextPublishSeqNo();
698+
unconfirmedSet.add(deliveryTag);
697699
nextPublishSeqNo++;
700+
} else {
701+
deliveryTag = 0;
698702
}
699703
if (props == null) {
700704
props = MessageProperties.MINIMAL_BASIC;
@@ -712,7 +716,7 @@ public void basicPublish(String exchange, String routingKey,
712716
metricsCollector.basicPublishFailure(this, e);
713717
throw e;
714718
}
715-
metricsCollector.basicPublish(this);
719+
metricsCollector.basicPublish(this, deliveryTag);
716720
}
717721

718722
/** Public API - {@inheritDoc} */

src/test/java/com/rabbitmq/client/test/MetricsCollectorTest.java

Lines changed: 62 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -132,15 +132,15 @@ public void basicGetAndAck() {
132132
assertThat(failedToPublishMessages(metrics), is(1L));
133133
assertThat(publishedMessages(metrics), is(0L));
134134

135-
metrics.basicPublish(channel);
135+
metrics.basicPublish(channel, 0L);
136136
assertThat(failedToPublishMessages(metrics), is(1L));
137137
assertThat(publishedMessages(metrics), is(1L));
138138

139139
metrics.basicPublishFailure(channel, new IOException());
140140
assertThat(failedToPublishMessages(metrics), is(2L));
141141
assertThat(publishedMessages(metrics), is(1L));
142142

143-
metrics.basicPublish(channel);
143+
metrics.basicPublish(channel, 0L);
144144
assertThat(failedToPublishMessages(metrics), is(2L));
145145
assertThat(publishedMessages(metrics), is(2L));
146146

@@ -150,43 +150,90 @@ public void basicGetAndAck() {
150150
}
151151

152152
@Test public void publishingAcknowledgements() {
153-
long anyDeliveryTag = 123L;
154153
AbstractMetricsCollector metrics = factory.create();
154+
Connection connection = mock(Connection.class);
155+
when(connection.getId()).thenReturn("connection-1");
155156
Channel channel = mock(Channel.class);
157+
when(channel.getConnection()).thenReturn(connection);
158+
when(channel.getChannelNumber()).thenReturn(1);
159+
160+
metrics.newConnection(connection);
161+
metrics.newChannel(channel);
162+
156163
// begins with no messages acknowledged
157164
assertThat(publishAck(metrics), is(0L));
158165
// first acknowledgement gets tracked
159-
metrics.basicPublishAck(channel, anyDeliveryTag, false);
166+
metrics.basicPublish(channel, 1);
167+
metrics.basicPublishAck(channel, 1, false);
160168
assertThat(publishAck(metrics), is(1L));
161169
// second acknowledgement gets tracked
162-
metrics.basicPublishAck(channel, anyDeliveryTag, false);
170+
metrics.basicPublish(channel, 2);
171+
metrics.basicPublishAck(channel, 2, false);
163172
assertThat(publishAck(metrics), is(2L));
164-
// multiple deliveries aren't tracked
165-
metrics.basicPublishAck(channel, anyDeliveryTag, true);
173+
174+
// this is idempotent
175+
metrics.basicPublishAck(channel, 2, false);
166176
assertThat(publishAck(metrics), is(2L));
177+
178+
// multi-ack
179+
metrics.basicPublish(channel, 3);
180+
metrics.basicPublish(channel, 4);
181+
metrics.basicPublish(channel, 5);
182+
// ack-ing in the middle
183+
metrics.basicPublishAck(channel, 4, false);
184+
assertThat(publishAck(metrics), is(3L));
185+
// ack-ing several at once
186+
metrics.basicPublishAck(channel, 5, true);
187+
assertThat(publishAck(metrics), is(5L));
188+
189+
// ack-ing non existent doesn't affect metrics
190+
metrics.basicPublishAck(channel, 123, true);
191+
assertThat(publishAck(metrics), is(5L));
192+
167193
// cleaning stale state doesn't affect the metric
168194
metrics.cleanStaleState();
169-
assertThat(publishAck(metrics), is(2L));
195+
assertThat(publishAck(metrics), is(5L));
170196
}
171197

172198
@Test public void publishingNotAcknowledgements() {
173-
long anyDeliveryTag = 123L;
174199
AbstractMetricsCollector metrics = factory.create();
200+
Connection connection = mock(Connection.class);
201+
when(connection.getId()).thenReturn("connection-1");
175202
Channel channel = mock(Channel.class);
203+
when(channel.getConnection()).thenReturn(connection);
204+
when(channel.getChannelNumber()).thenReturn(1);
205+
206+
metrics.newConnection(connection);
207+
metrics.newChannel(channel);
176208
// begins with no messages not-acknowledged
177209
assertThat(publishNack(metrics), is(0L));
178210
// first not-acknowledgement gets tracked
179-
metrics.basicPublishNack(channel, anyDeliveryTag, false);
211+
metrics.basicPublish(channel, 1);
212+
metrics.basicPublishNack(channel, 1, false);
180213
assertThat(publishNack(metrics), is(1L));
181214
// second not-acknowledgement gets tracked
182-
metrics.basicPublishNack(channel, anyDeliveryTag, false);
183-
assertThat(publishNack(metrics), is(2L));
184-
// multiple deliveries aren't tracked
185-
metrics.basicPublishNack(channel, anyDeliveryTag, true);
215+
metrics.basicPublish(channel, 2);
216+
metrics.basicPublishNack(channel, 2, false);
186217
assertThat(publishNack(metrics), is(2L));
218+
219+
// multi-nack
220+
metrics.basicPublish(channel, 3);
221+
metrics.basicPublish(channel, 4);
222+
metrics.basicPublish(channel, 5);
223+
// ack-ing in the middle
224+
metrics.basicPublishNack(channel, 4, false);
225+
assertThat(publishNack(metrics), is(3L));
226+
// ack-ing several at once
227+
metrics.basicPublishNack(channel, 5, true);
228+
assertThat(publishNack(metrics), is(5L));
229+
230+
// ack-ing non existent doesn't affect metrics
231+
metrics.basicPublishNack(channel, 123, true);
232+
assertThat(publishNack(metrics), is(5L));
233+
187234
// cleaning stale state doesn't affect the metric
188235
metrics.cleanStaleState();
189-
assertThat(publishNack(metrics), is(2L));
236+
assertThat(publishNack(metrics), is(5L));
190237
}
191238

192239
@Test public void publishingUnrouted() {

0 commit comments

Comments
 (0)