Skip to content

Commit 3f686fd

Browse files
committed
Merge branch '4.1.x-stable' into 4.2.x-stable
2 parents 21028f9 + 520bf70 commit 3f686fd

File tree

1 file changed

+33
-13
lines changed

1 file changed

+33
-13
lines changed

src/test/java/com/rabbitmq/client/test/functional/ConsumerPriorities.java

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@
1515

1616
package com.rabbitmq.client.test.functional;
1717

18-
import com.rabbitmq.client.*;
1918
import com.rabbitmq.client.test.BrokerTestCase;
2019
import org.junit.Test;
20+
import static org.junit.Assert.assertEquals;
21+
import static org.junit.Assert.assertTrue;
22+
import static org.junit.Assert.fail;
2123

2224
import java.io.IOException;
2325
import java.util.Arrays;
@@ -27,10 +29,17 @@
2729
import java.util.concurrent.LinkedBlockingQueue;
2830
import java.util.concurrent.TimeUnit;
2931

30-
import static org.junit.Assert.assertEquals;
31-
import static org.junit.Assert.fail;
32+
import java.util.concurrent.CountDownLatch;
33+
34+
import com.rabbitmq.client.DefaultConsumer;
35+
import com.rabbitmq.client.Envelope;
36+
37+
import com.rabbitmq.client.AMQP;
38+
import com.rabbitmq.client.Channel;
39+
import com.rabbitmq.client.MessageProperties;
3240

3341
public class ConsumerPriorities extends BrokerTestCase {
42+
3443
@Test public void validation() throws IOException {
3544
assertFailValidation(args("banana"));
3645
assertFailValidation(args(new HashMap<Object, Object>()));
@@ -50,6 +59,8 @@ private void assertFailValidation(Map<String, Object> args) throws IOException {
5059
}
5160

5261
private static final int COUNT = 10;
62+
private static final long DELIVERY_TIMEOUT_MS = 100;
63+
private static final long CANCEL_OK_TIMEOUT_MS = 10 * 1000;
5364

5465
@Test public void consumerPriorities() throws Exception {
5566
String queue = channel.queueDeclare().getQueue();
@@ -61,13 +72,20 @@ private void assertFailValidation(Map<String, Object> args) throws IOException {
6172
channel.basicConsume(queue, true, args(-1), lowConsumer);
6273

6374
publish(queue, COUNT, "high");
75+
assertContents(highConsumer, COUNT, "high");
6476
channel.basicCancel(high);
77+
assertTrue(
78+
"High priority consumer should have been cancelled",
79+
highConsumer.cancelLatch.await(CANCEL_OK_TIMEOUT_MS, TimeUnit.MILLISECONDS)
80+
);
6581
publish(queue, COUNT, "med");
82+
assertContents(medConsumer, COUNT, "med");
6683
channel.basicCancel(med);
84+
assertTrue(
85+
"Medium priority consumer should have been cancelled",
86+
medConsumer.cancelLatch.await(CANCEL_OK_TIMEOUT_MS, TimeUnit.MILLISECONDS)
87+
);
6788
publish(queue, COUNT, "low");
68-
69-
assertContents(highConsumer, COUNT, "high");
70-
assertContents(medConsumer, COUNT, "med");
7189
assertContents(lowConsumer, COUNT, "low");
7290
}
7391

@@ -77,12 +95,12 @@ private Map<String, Object> args(Object o) {
7795
return map;
7896
}
7997

80-
private void assertContents(QueueMessageConsumer c, int count, String msg) throws InterruptedException {
98+
private void assertContents(QueueMessageConsumer qc, int count, String msg) throws InterruptedException {
8199
for (int i = 0; i < count; i++) {
82-
byte[] body = c.nextDelivery(100);
100+
byte[] body = qc.nextDelivery(DELIVERY_TIMEOUT_MS);
83101
assertEquals(msg, new String(body));
84102
}
85-
assertEquals(null, c.nextDelivery());
103+
assertEquals(null, qc.nextDelivery(DELIVERY_TIMEOUT_MS));
86104
}
87105

88106
private void publish(String queue, int count, String msg) throws IOException {
@@ -91,10 +109,12 @@ private void publish(String queue, int count, String msg) throws IOException {
91109
}
92110
}
93111

94-
class QueueMessageConsumer extends DefaultConsumer {
112+
private static class QueueMessageConsumer extends DefaultConsumer {
95113

96114
BlockingQueue<byte[]> messages = new LinkedBlockingQueue<byte[]>();
97115

116+
CountDownLatch cancelLatch = new CountDownLatch(1);
117+
98118
public QueueMessageConsumer(Channel channel) {
99119
super(channel);
100120
}
@@ -104,14 +124,14 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
104124
messages.add(body);
105125
}
106126

107-
byte[] nextDelivery() {
108-
return messages.poll();
127+
@Override
128+
public void handleCancelOk(String consumerTag) {
129+
cancelLatch.countDown();
109130
}
110131

111132
byte[] nextDelivery(long timeoutInMs) throws InterruptedException {
112133
return messages.poll(timeoutInMs, TimeUnit.MILLISECONDS);
113134
}
114135

115136
}
116-
117137
}

0 commit comments

Comments
 (0)