43
43
import com .rabbitmq .client .ConfirmListener ;
44
44
import com .rabbitmq .client .Connection ;
45
45
import com .rabbitmq .client .ConnectionFactory ;
46
+ import com .rabbitmq .client .ConsumerCancelledException ;
46
47
import com .rabbitmq .client .Envelope ;
47
48
import com .rabbitmq .client .MessageProperties ;
48
49
import com .rabbitmq .client .QueueingConsumer ;
@@ -108,12 +109,10 @@ public static void main(String[] args) {
108
109
flags .contains ("persistent" ),
109
110
exclusive , autoDelete ,
110
111
null ).getQueue ();
111
- QueueingConsumer consumer = new QueueingConsumer (channel );
112
112
if (prefetchCount > 0 ) channel .basicQos (prefetchCount );
113
- channel .basicConsume (qName , autoAck , consumer );
114
113
channel .queueBind (qName , exchangeName , id );
115
114
Thread t =
116
- new Thread (new Consumer (consumer , id ,
115
+ new Thread (new Consumer (channel , id , qName ,
117
116
consumerTxSize , autoAck ,
118
117
stats , timeLimit ));
119
118
consumerThreads [i ] = t ;
@@ -419,18 +418,21 @@ private byte[] createMessage(int sequenceNumber)
419
418
public static class Consumer implements Runnable {
420
419
421
420
private QueueingConsumer q ;
421
+ private Channel channel ;
422
422
private String id ;
423
+ private String queueName ;
423
424
private int txSize ;
424
425
private boolean autoAck ;
425
426
private Stats stats ;
426
427
private long timeLimit ;
427
428
428
- public Consumer (QueueingConsumer q , String id ,
429
- int txSize , boolean autoAck ,
429
+ public Consumer (Channel channel , String id ,
430
+ String queueName , int txSize , boolean autoAck ,
430
431
Stats stats , int timeLimit ) {
431
432
432
- this .q = q ;
433
+ this .channel = channel ;
433
434
this .id = id ;
435
+ this .queueName = queueName ;
434
436
this .txSize = txSize ;
435
437
this .autoAck = autoAck ;
436
438
this .stats = stats ;
@@ -444,17 +446,24 @@ public void run() {
444
446
startTime = now = System .currentTimeMillis ();
445
447
int totalMsgCount = 0 ;
446
448
447
- Channel channel = q .getChannel ();
448
-
449
449
try {
450
+ q = new QueueingConsumer (channel );
451
+ channel .basicConsume (queueName , autoAck , q );
450
452
451
453
while (timeLimit == 0 || now < startTime + timeLimit ) {
452
454
Delivery delivery ;
453
- if (timeLimit == 0 ) {
454
- delivery = q .nextDelivery ();
455
- } else {
456
- delivery = q .nextDelivery (startTime + timeLimit - now );
457
- if (delivery == null ) break ;
455
+ try {
456
+ if (timeLimit == 0 ) {
457
+ delivery = q .nextDelivery ();
458
+ } else {
459
+ delivery = q .nextDelivery (startTime + timeLimit - now );
460
+ if (delivery == null ) break ;
461
+ }
462
+ } catch (ConsumerCancelledException e ) {
463
+ System .out .println ("Consumer cancelled by broker. Re-consuming." );
464
+ q = new QueueingConsumer (channel );
465
+ channel .basicConsume (queueName , autoAck , q );
466
+ continue ;
458
467
}
459
468
totalMsgCount ++;
460
469
0 commit comments