46
46
import com .rabbitmq .client .ConnectionFactory ;
47
47
import com .rabbitmq .client .Envelope ;
48
48
import com .rabbitmq .client .MessageProperties ;
49
+ import com .rabbitmq .client .NackListener ;
49
50
import com .rabbitmq .client .QueueingConsumer ;
50
51
import com .rabbitmq .client .ReturnListener ;
51
52
import com .rabbitmq .client .ShutdownSignalException ;
@@ -140,6 +141,7 @@ public static void main(String[] args) {
140
141
confirm , confirmMax );
141
142
channel .setReturnListener (p );
142
143
channel .setAckListener (p );
144
+ channel .setNackListener (p );
143
145
Thread t = new Thread (p );
144
146
producerThreads [i ] = t ;
145
147
t .start ();
@@ -214,7 +216,8 @@ private static List lstArg(CommandLine cmd, char opt) {
214
216
return Arrays .asList (vals );
215
217
}
216
218
217
- public static class Producer implements Runnable , ReturnListener , AckListener {
219
+ public static class Producer implements Runnable , ReturnListener , AckListener ,
220
+ NackListener {
218
221
219
222
private Channel channel ;
220
223
private String exchangeName ;
@@ -236,6 +239,7 @@ public static class Producer implements Runnable, ReturnListener, AckListener {
236
239
237
240
private boolean confirm ;
238
241
private long confirmCount ;
242
+ private long nackCount ;
239
243
private Semaphore confirmPool ;
240
244
private volatile SortedSet <Long > ackSet =
241
245
Collections .synchronizedSortedSet (new TreeSet <Long >());
@@ -274,6 +278,15 @@ public synchronized void handleBasicReturn(int replyCode,
274
278
}
275
279
276
280
public void handleAck (long seqNo , boolean multiple ) {
281
+ handleAckNack (seqNo , multiple , false );
282
+ }
283
+
284
+ public void handleNack (long seqNo , boolean multiple ) {
285
+ handleAckNack (seqNo , multiple , true );
286
+ }
287
+
288
+ private void handleAckNack (long seqNo , boolean multiple ,
289
+ boolean nack ) {
277
290
int numConfirms = 0 ;
278
291
if (multiple ) {
279
292
SortedSet <Long > confirmed = ackSet .headSet (seqNo + 1 );
@@ -284,14 +297,19 @@ public void handleAck(long seqNo, boolean multiple) {
284
297
numConfirms = 1 ;
285
298
}
286
299
synchronized (this ) {
287
- confirmCount += numConfirms ;
300
+ if (nack ) {
301
+ nackCount += numConfirms ;
302
+ } else {
303
+ confirmCount += numConfirms ;
304
+ }
288
305
}
289
306
290
307
if (confirmPool != null ) {
291
308
for (int i = 0 ; i < numConfirms ; ++i ) {
292
309
confirmPool .release ();
293
310
}
294
311
}
312
+
295
313
}
296
314
297
315
public void run () {
@@ -354,21 +372,26 @@ private void delay(long now)
354
372
Thread .sleep (pause );
355
373
}
356
374
if (elapsed > interval ) {
357
- long sendRate , returnRate , confirmRate ;
375
+ long sendRate , returnRate , confirmRate , nackRate ;
358
376
synchronized (this ) {
359
377
sendRate = msgCount * 1000L / elapsed ;
360
378
returnRate = returnCount * 1000L / elapsed ;
361
379
confirmRate = confirmCount * 1000L / elapsed ;
380
+ nackRate = nackCount * 1000L / elapsed ;
362
381
msgCount = 0 ;
363
382
returnCount = 0 ;
364
383
confirmCount = 0 ;
384
+ nackCount = 0 ;
365
385
}
366
386
System .out .print ("sending rate: " + sendRate + " msg/s" );
367
387
if (mandatory || immediate ) {
368
388
System .out .print (", returns: " + returnRate + " ret/s" );
369
389
}
370
390
if (confirm ) {
371
391
System .out .print (", confirms: " + confirmRate + " c/s" );
392
+ if (nackRate > 0 ) {
393
+ System .out .print (", nacks: " + nackRate + " n/s" );
394
+ }
372
395
}
373
396
System .out .println ();
374
397
lastStatsTime = now ;
0 commit comments