17
17
package com .rabbitmq .client .test .functional ;
18
18
19
19
import java .io .IOException ;
20
+ import java .util .concurrent .ArrayBlockingQueue ;
21
+ import java .util .concurrent .BlockingQueue ;
20
22
21
23
import com .rabbitmq .client .Consumer ;
22
24
import com .rabbitmq .client .ConsumerCancelledException ;
@@ -28,81 +30,55 @@ public class ConsumerCancelNotificiation extends BrokerTestCase {
28
30
29
31
private final String queue = "cancel_notification_queue" ;
30
32
31
- private final Object lock = new Object ();
33
+ public void testConsumerCancellationNotification () throws IOException ,
34
+ InterruptedException {
35
+ final BlockingQueue <Boolean > result = new ArrayBlockingQueue <Boolean >(1 );
32
36
33
- private boolean notified = false ;
34
-
35
- private boolean failed = false ;
36
-
37
- public void testConsumerCancellationNotification () throws IOException {
38
- synchronized (lock ) {
39
- notified = false ;
40
- }
41
37
channel .queueDeclare (queue , false , true , false , null );
42
38
Consumer consumer = new QueueingConsumer (channel ) {
43
39
@ Override
44
40
public void handleCancel (String consumerTag ) throws IOException {
45
- synchronized (lock ) {
46
- notified = true ;
47
- lock .notifyAll ();
41
+ try {
42
+ result .put (true );
43
+ } catch (InterruptedException e ) {
44
+ fail ();
48
45
}
49
46
}
50
47
};
51
48
channel .basicConsume (queue , consumer );
52
49
channel .queueDelete (queue );
53
- synchronized (lock ) {
54
- if (!notified ) {
55
- try {
56
- lock .wait ();
57
- } catch (InterruptedException e ) {
58
- }
59
- }
60
- assertTrue (notified );
61
- }
50
+ assertTrue (result .take ());
62
51
}
63
52
64
53
public void testConsumerCancellationInterruptsQueuingConsumerWait ()
65
54
throws IOException , InterruptedException {
66
- synchronized (lock ) {
67
- notified = false ;
68
- failed = false ;
69
- }
55
+ final BlockingQueue <Boolean > result = new ArrayBlockingQueue <Boolean >(1 );
70
56
channel .queueDeclare (queue , false , true , false , null );
71
57
final QueueingConsumer consumer = new QueueingConsumer (channel );
72
58
Runnable receiver = new Runnable () {
73
59
74
60
@ Override
75
61
public void run () {
76
62
try {
77
- consumer .nextDelivery ();
78
- } catch (ConsumerCancelledException e ) {
79
- synchronized (lock ) {
80
- notified = true ;
81
- lock .notifyAll ();
82
- return ; // avoid fall through to failure
63
+ try {
64
+ consumer .nextDelivery ();
65
+ } catch (ConsumerCancelledException e ) {
66
+ result .put (true );
67
+ return ;
68
+ } catch (ShutdownSignalException e ) {
69
+ } catch (InterruptedException e ) {
83
70
}
84
- } catch ( ShutdownSignalException e ) {
71
+ result . put ( false );
85
72
} catch (InterruptedException e ) {
86
- }
87
- synchronized (lock ) {
88
- failed = true ;
89
- lock .notifyAll ();
73
+ fail ();
90
74
}
91
75
}
92
76
};
93
77
Thread t = new Thread (receiver );
94
78
t .start ();
95
79
channel .basicConsume (queue , consumer );
96
80
channel .queueDelete (queue );
97
- synchronized (lock ) {
98
- if (!(notified || failed )) {
99
- try {
100
- lock .wait ();
101
- } catch (InterruptedException e ) {
102
- }
103
- }
104
- assertTrue (notified );
105
- }
81
+ assertTrue (result .take ());
106
82
t .join ();
107
83
}
108
84
}
0 commit comments