Skip to content

Commit 159278e

Browse files
author
Matthew Sackman
committed
merging bug20943
2 parents ce26cc0 + c16ba06 commit 159278e

File tree

1 file changed

+57
-0
lines changed

1 file changed

+57
-0
lines changed

test/src/com/rabbitmq/client/test/functional/QosTests.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,19 @@
3838
import java.util.List;
3939
import java.util.Queue;
4040

41+
import java.util.Collections;
42+
import java.util.Map;
43+
import java.util.HashMap;
44+
4145
import com.rabbitmq.client.AMQP;
4246
import com.rabbitmq.client.Channel;
4347
import com.rabbitmq.client.GetResponse;
4448
import com.rabbitmq.client.QueueingConsumer;
4549
import com.rabbitmq.client.QueueingConsumer.Delivery;
4650

51+
import com.rabbitmq.client.AMQP.BasicProperties;
52+
import com.rabbitmq.client.Envelope;
53+
4754
public class QosTests extends BrokerTestCase
4855
{
4956

@@ -201,6 +208,56 @@ public void testFairness()
201208

202209
}
203210

211+
public void testRoundRobin()
212+
throws IOException
213+
{
214+
//check that when we have multiple consumers on the same
215+
//channel & queue, and a prefetch limit set, that all
216+
//consumers get a fair share of the messages
217+
218+
channel.basicQos(1);
219+
String q = channel.queueDeclare().getQueue();
220+
channel.queueBind(q, "amq.fanout", "");
221+
222+
final Map<String, Integer> counts =
223+
Collections.synchronizedMap(new HashMap<String, Integer>());
224+
final String [] nextTag = new String[] { null };
225+
226+
QueueingConsumer c = new QueueingConsumer(channel) {
227+
@Override public void handleDelivery(String consumerTag,
228+
Envelope envelope,
229+
AMQP.BasicProperties properties,
230+
byte[] body)
231+
throws IOException {
232+
String otherConsumerTag = "c1".equals(consumerTag) ? "c2" : "c1";
233+
if (null != nextTag[0])
234+
assertEquals(consumerTag, nextTag[0]);
235+
nextTag[0] = otherConsumerTag;
236+
counts.put(consumerTag, counts.get(consumerTag) + 1);
237+
super.handleDelivery(consumerTag, envelope,
238+
properties, body);
239+
}
240+
};
241+
242+
channel.basicConsume(q, false, "c1", c);
243+
channel.basicConsume(q, false, "c2", c);
244+
245+
int count = 4;
246+
counts.put("c1", 0);
247+
counts.put("c2", 0);
248+
fill(count);
249+
try {
250+
for (int i = 0; i < count; i++) {
251+
Delivery d = c.nextDelivery();
252+
channel.basicAck(d.getEnvelope().getDeliveryTag(), false);
253+
}
254+
} catch (InterruptedException ie) {
255+
fail("interrupted");
256+
}
257+
assertEquals(count / 2, counts.get("c1").intValue());
258+
assertEquals(count / 2, counts.get("c2").intValue());
259+
}
260+
204261
public void testConsumerLifecycle()
205262
throws IOException
206263
{

0 commit comments

Comments
 (0)