@@ -60,34 +60,21 @@ public void testExpiryWhenConsumerIsLateToTheParty() throws Exception {
60
60
}
61
61
62
62
public void testRestartingExpiry () throws Exception {
63
- final String restartDelay = "2000" ;
63
+ final String expiryDelay = "2000" ;
64
64
declareDurableQueue (TTL_QUEUE_NAME );
65
- channel .basicPublish ("" , TTL_QUEUE_NAME ,
65
+ bindQueue ();
66
+ channel .basicPublish (TTL_EXCHANGE , TTL_QUEUE_NAME ,
66
67
MessageProperties .MINIMAL_PERSISTENT_BASIC
67
68
.builder ()
68
- .expiration (restartDelay )
69
+ .expiration (expiryDelay )
69
70
.build (), new byte []{});
70
-
71
- Thread delayedConsume =
72
- new Thread (new Runnable () {
73
- @ Override
74
- public void run () {
75
- try {
76
- Thread .sleep (Integer .parseInt (restartDelay ));
77
- while (channel == null || !channel .isOpen ()) {
78
- Thread .sleep (250 );
79
- }
80
- retrievedMsg = get ();
81
- } catch (IOException e ) {
82
- } catch (InterruptedException e ) {
83
- }
84
-
85
- }
86
- });
87
- delayedConsume .start ();
71
+ long expiryStartTime = System .currentTimeMillis ();
88
72
restart ();
89
- delayedConsume .join ();
90
- assertNull ("Message should have expired after broker restart" , retrievedMsg );
73
+ long timeToExpiry = Integer .parseInt (expiryDelay ) - (System .currentTimeMillis () - expiryStartTime );
74
+ if (timeToExpiry > 0L ) {
75
+ Thread .sleep (timeToExpiry );
76
+ }
77
+ assertNull ("Message should have expired after broker restart" , get ());
91
78
}
92
79
93
80
}
0 commit comments