Skip to content

Commit f864a1e

Browse files
author
Matthias Radestock
committed
test round-robining in the presence of blocking
1 parent ce26cc0 commit f864a1e

File tree

1 file changed

+52
-0
lines changed

1 file changed

+52
-0
lines changed

test/src/com/rabbitmq/client/test/functional/QosTests.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,19 @@
3838
import java.util.List;
3939
import java.util.Queue;
4040

41+
import java.util.Collections;
42+
import java.util.Map;
43+
import java.util.HashMap;
44+
4145
import com.rabbitmq.client.AMQP;
4246
import com.rabbitmq.client.Channel;
4347
import com.rabbitmq.client.GetResponse;
4448
import com.rabbitmq.client.QueueingConsumer;
4549
import com.rabbitmq.client.QueueingConsumer.Delivery;
4650

51+
import com.rabbitmq.client.AMQP.BasicProperties;
52+
import com.rabbitmq.client.Envelope;
53+
4754
public class QosTests extends BrokerTestCase
4855
{
4956

@@ -201,6 +208,51 @@ public void testFairness()
201208

202209
}
203210

211+
public void testRoundRobin()
212+
throws IOException
213+
{
214+
//check that when we have multiple consumers on the same
215+
//channel & queue, and a prefetch limit set, that all
216+
//consumers get a fair share of the messages
217+
218+
channel.basicQos(1);
219+
String q = channel.queueDeclare().getQueue();
220+
channel.queueBind(q, "amq.fanout", "");
221+
222+
final Map<String, Integer> counts =
223+
Collections.synchronizedMap(new HashMap<String, Integer>());
224+
225+
QueueingConsumer c = new QueueingConsumer(channel) {
226+
@Override public void handleDelivery(String consumerTag,
227+
Envelope envelope,
228+
AMQP.BasicProperties properties,
229+
byte[] body)
230+
throws IOException {
231+
counts.put(consumerTag, counts.get(consumerTag) + 1);
232+
super.handleDelivery(consumerTag, envelope,
233+
properties, body);
234+
}
235+
};
236+
237+
channel.basicConsume(q, false, "c1", c);
238+
channel.basicConsume(q, false, "c2", c);
239+
240+
int count = 4;
241+
counts.put("c1", 0);
242+
counts.put("c2", 0);
243+
fill(count);
244+
try {
245+
for (int i = 0; i < count; i++) {
246+
Delivery d = c.nextDelivery();
247+
channel.basicAck(d.getEnvelope().getDeliveryTag(), false);
248+
}
249+
} catch (InterruptedException ie) {
250+
fail("interrupted");
251+
}
252+
assertEquals(count / 2, counts.get("c1").intValue());
253+
assertEquals(count / 2, counts.get("c2").intValue());
254+
}
255+
204256
public void testConsumerLifecycle()
205257
throws IOException
206258
{

0 commit comments

Comments
 (0)