21
21
import static org .junit .jupiter .api .Assertions .assertNull ;
22
22
import static org .junit .jupiter .api .Assertions .fail ;
23
23
24
+ import com .rabbitmq .client .test .TestUtils ;
24
25
import com .rabbitmq .client .test .TestUtils .BrokerVersion ;
25
26
import com .rabbitmq .client .test .TestUtils .BrokerVersionAtLeast ;
26
27
import java .io .IOException ;
28
+ import java .time .Duration ;
27
29
import java .util .ArrayList ;
28
30
import java .util .HashMap ;
29
31
import java .util .List ;
@@ -47,6 +49,7 @@ public class Routing extends BrokerTestCase
47
49
protected final String Q2 = "bar" ;
48
50
49
51
private volatile BlockingCell <Integer > returnCell ;
52
+ private static final int TIMEOUT = (int ) Duration .ofSeconds (10 ).toMillis ();
50
53
51
54
protected void createResources () throws IOException {
52
55
channel .exchangeDeclare (E , "direct" );
@@ -289,13 +292,13 @@ public void headersWithXRouting() throws Exception {
289
292
checkGet (Q2 , false );
290
293
}
291
294
292
- @ Test public void basicReturn () throws IOException {
295
+ @ Test public void basicReturn () throws Exception {
293
296
channel .addReturnListener (makeReturnListener ());
294
297
returnCell = new BlockingCell <Integer >();
295
298
296
299
//returned 'mandatory' publish
297
300
channel .basicPublish ("" , "unknown" , true , false , null , "mandatory1" .getBytes ());
298
- checkReturn (AMQP . NO_ROUTE );
301
+ checkReturn ();
299
302
300
303
//routed 'mandatory' publish
301
304
channel .basicPublish ("" , Q1 , true , false , null , "mandatory2" .getBytes ());
@@ -313,7 +316,7 @@ public void headersWithXRouting() throws Exception {
313
316
}
314
317
}
315
318
316
- @ Test public void basicReturnTransactional () throws IOException {
319
+ @ Test public void basicReturnTransactional () throws Exception {
317
320
channel .txSelect ();
318
321
channel .addReturnListener (makeReturnListener ());
319
322
returnCell = new BlockingCell <Integer >();
@@ -325,39 +328,31 @@ public void headersWithXRouting() throws Exception {
325
328
fail ("basic.return issued prior to tx.commit" );
326
329
} catch (TimeoutException toe ) {}
327
330
channel .txCommit ();
328
- checkReturn (AMQP . NO_ROUTE );
331
+ checkReturn ();
329
332
330
333
//routed 'mandatory' publish
331
334
channel .basicPublish ("" , Q1 , true , false , null , "mandatory2" .getBytes ());
332
335
channel .txCommit ();
333
336
assertNotNull (channel .basicGet (Q1 , true ));
334
337
335
- //returned 'mandatory' publish when message is routable on
336
- //publish but not on commit
337
- channel .basicPublish ("" , Q1 , true , false , null , "mandatory2" .getBytes ());
338
- channel .queueDelete (Q1 );
339
- channel .txCommit ();
340
- checkReturn (AMQP .NO_ROUTE );
341
- channel .queueDeclare (Q1 , false , false , false , null );
338
+ if (TestUtils .atMost312 (connection )) {
339
+ //returned 'mandatory' publish when message is routable on
340
+ //publish but not on commit
341
+ channel .basicPublish ("" , Q1 , true , false , null , "mandatory2" .getBytes ());
342
+ channel .queueDelete (Q1 );
343
+ channel .txCommit ();
344
+ checkReturn ();
345
+ channel .queueDeclare (Q1 , false , false , false , null );
346
+ }
342
347
}
343
348
344
349
protected ReturnListener makeReturnListener () {
345
- return new ReturnListener () {
346
- public void handleReturn (int replyCode ,
347
- String replyText ,
348
- String exchange ,
349
- String routingKey ,
350
- AMQP .BasicProperties properties ,
351
- byte [] body )
352
- throws IOException {
353
- Routing .this .returnCell .set (replyCode );
354
- }
355
- };
350
+ return (replyCode , replyText , exchange , routingKey , properties , body ) -> Routing .this .returnCell .set (replyCode );
356
351
}
357
352
358
- protected void checkReturn (int replyCode ) {
359
- assertEquals ((int )returnCell .uninterruptibleGet (), AMQP .NO_ROUTE );
360
- returnCell = new BlockingCell <Integer >();
353
+ protected void checkReturn () throws TimeoutException {
354
+ assertEquals ((int )returnCell .uninterruptibleGet (TIMEOUT ), AMQP .NO_ROUTE );
355
+ returnCell = new BlockingCell <>();
361
356
}
362
357
363
358
}
0 commit comments