@@ -208,31 +208,26 @@ public void testFairness()
208
208
209
209
}
210
210
211
- public void testRoundRobin ()
211
+ public void testSingleChannelAndQueueFairness ()
212
212
throws IOException
213
213
{
214
214
//check that when we have multiple consumers on the same
215
215
//channel & queue, and a prefetch limit set, that all
216
- //consumers get a fair share of the messages
216
+ //consumers get a fair share of the messages.
217
217
218
218
channel .basicQos (1 );
219
219
String q = channel .queueDeclare ().getQueue ();
220
220
channel .queueBind (q , "amq.fanout" , "" );
221
221
222
222
final Map <String , Integer > counts =
223
223
Collections .synchronizedMap (new HashMap <String , Integer >());
224
- final String [] nextTag = new String [] { null };
225
224
226
225
QueueingConsumer c = new QueueingConsumer (channel ) {
227
226
@ Override public void handleDelivery (String consumerTag ,
228
227
Envelope envelope ,
229
228
AMQP .BasicProperties properties ,
230
229
byte [] body )
231
230
throws IOException {
232
- String otherConsumerTag = "c1" .equals (consumerTag ) ? "c2" : "c1" ;
233
- if (null != nextTag [0 ])
234
- assertEquals (consumerTag , nextTag [0 ]);
235
- nextTag [0 ] = otherConsumerTag ;
236
231
counts .put (consumerTag , counts .get (consumerTag ) + 1 );
237
232
super .handleDelivery (consumerTag , envelope ,
238
233
properties , body );
@@ -242,7 +237,7 @@ public void testRoundRobin()
242
237
channel .basicConsume (q , false , "c1" , c );
243
238
channel .basicConsume (q , false , "c2" , c );
244
239
245
- int count = 4 ;
240
+ int count = 10 ;
246
241
counts .put ("c1" , 0 );
247
242
counts .put ("c2" , 0 );
248
243
fill (count );
@@ -254,8 +249,12 @@ public void testRoundRobin()
254
249
} catch (InterruptedException ie ) {
255
250
fail ("interrupted" );
256
251
}
257
- assertEquals (count / 2 , counts .get ("c1" ).intValue ());
258
- assertEquals (count / 2 , counts .get ("c2" ).intValue ());
252
+
253
+ //we only check that the server isn't grossly unfair; perfect
254
+ //fairness is too much to ask for (even though RabbitMQ atm
255
+ //does actually provide it in this case)
256
+ assertTrue (counts .get ("c1" ).intValue () > 0 );
257
+ assertTrue (counts .get ("c2" ).intValue () > 0 );
259
258
}
260
259
261
260
public void testConsumerLifecycle ()
0 commit comments