46
46
47
47
/**
48
48
* @author Costin Leau
49
+ * @author Jennifer Hickey
49
50
*/
50
51
public class PubSubResubscribeTests {
51
52
52
53
private static final String CHANNEL = "pubsub::test" ;
53
- private final Long ZERO = Long .valueOf (0 );
54
- private final Long ONE = Long .valueOf (1 );
55
- private final Long TWO = Long .valueOf (2 );
56
54
57
55
protected RedisMessageListenerContainer container ;
58
56
protected RedisConnectionFactory factory ;
59
57
protected RedisTemplate template ;
60
58
61
59
private final BlockingDeque <String > bag = new LinkedBlockingDeque <String >(99 );
62
60
63
- private final Object handler = new Object () {
64
- public void handleMessage (String message ) {
65
- System .out .println (message );
66
- bag .add (message );
67
- }
68
- };
61
+ private final Object handler = new MessageHandler ("handler1" , bag );
69
62
70
63
private final MessageListenerAdapter adapter = new MessageListenerAdapter (handler );
71
64
@@ -121,7 +114,8 @@ public void testContainerPatternResubscribe() throws Exception {
121
114
final String PATTERN = "p*" ;
122
115
final String ANOTHER_CHANNEL = "pubsub::test::extra" ;
123
116
124
- MessageListenerAdapter anotherListener = new MessageListenerAdapter (handler );
117
+ BlockingDeque <String > bag2 = new LinkedBlockingDeque <String >(99 );
118
+ MessageListenerAdapter anotherListener = new MessageListenerAdapter (new MessageHandler ("handler2" , bag2 ));
125
119
anotherListener .setSerializer (template .getValueSerializer ());
126
120
anotherListener .afterPropertiesSet ();
127
121
@@ -130,29 +124,36 @@ public void testContainerPatternResubscribe() throws Exception {
130
124
container .removeMessageListener (adapter );
131
125
132
126
// test no messages are sent just to patterns
133
- assertEquals ( ONE , template .convertAndSend (CHANNEL , payload1 ) );
134
- assertEquals ( ONE , template .convertAndSend (ANOTHER_CHANNEL , payload2 ) );
127
+ template .convertAndSend (CHANNEL , payload1 );
128
+ template .convertAndSend (ANOTHER_CHANNEL , payload2 );
135
129
130
+ // anotherListener receives both messages
136
131
List <String > msgs = new ArrayList <String >();
137
- msgs .add (bag .poll (1 , TimeUnit .SECONDS ));
138
- msgs .add (bag .poll (1 , TimeUnit .SECONDS ));
132
+ msgs .add (bag2 .poll (1 , TimeUnit .SECONDS ));
133
+ msgs .add (bag2 .poll (1 , TimeUnit .SECONDS ));
139
134
140
135
assertEquals (2 , msgs .size ());
136
+ assertTrue (msgs .contains (payload1 ));
137
+ assertTrue (msgs .contains (payload2 ));
138
+ msgs .clear ();
139
+
140
+ // unsubscribed adapter did not receive message
141
+ assertNull (bag .poll (1 , TimeUnit .SECONDS ));
142
+
141
143
// bind original listener on another channel
142
144
container .addMessageListener (adapter , new ChannelTopic (ANOTHER_CHANNEL ));
143
145
144
- assertEquals ( ONE , template .convertAndSend (CHANNEL , payload1 ) );
145
- assertEquals ( TWO , template .convertAndSend (ANOTHER_CHANNEL , payload2 ) );
146
+ template .convertAndSend (CHANNEL , payload1 );
147
+ template .convertAndSend (ANOTHER_CHANNEL , payload2 );
146
148
147
- msgs .add (bag .poll (1 , TimeUnit .SECONDS ));
148
- msgs .add (bag .poll (1 , TimeUnit .SECONDS ));
149
- msgs .add (bag .poll (1 , TimeUnit .SECONDS ));
150
- // this message will not arrive on time
149
+ // original listener received only one message on another channel
150
+ assertEquals (payload2 ,bag .poll (1 , TimeUnit .SECONDS ));
151
151
assertNull (bag .poll (1 , TimeUnit .SECONDS ));
152
152
153
- // same message received first per channel subscription, second based on the pattern
154
- assertEquals (5 , msgs .size ());
155
-
153
+ //another listener receives messages on both channels
154
+ msgs .add (bag2 .poll (1 , TimeUnit .SECONDS ));
155
+ msgs .add (bag2 .poll (1 , TimeUnit .SECONDS ));
156
+ assertEquals (2 , msgs .size ());
156
157
assertTrue (msgs .contains (payload1 ));
157
158
assertTrue (msgs .contains (payload2 ));
158
159
}
@@ -171,22 +172,36 @@ public void testContainerChannelResubscribe() throws Exception {
171
172
container .addMessageListener (adapter , new ChannelTopic (ANOTHER_CHANNEL ));
172
173
container .removeMessageListener (null , new ChannelTopic (CHANNEL ));
173
174
174
- assertEquals (ZERO , template .convertAndSend (CHANNEL , payload1 ));
175
- assertEquals (ZERO , template .convertAndSend (CHANNEL , payload2 ));
175
+ // Listener removed from channel
176
+ template .convertAndSend (CHANNEL , payload1 );
177
+ template .convertAndSend (CHANNEL , payload2 );
176
178
177
- assertEquals (ONE , template .convertAndSend (ANOTHER_CHANNEL , anotherPayload1 ));
178
- assertEquals (ONE , template .convertAndSend (ANOTHER_CHANNEL , anotherPayload2 ));
179
+ // Listener receives messages on another channel
180
+ template .convertAndSend (ANOTHER_CHANNEL , anotherPayload1 );
181
+ template .convertAndSend (ANOTHER_CHANNEL , anotherPayload2 );
179
182
180
183
Set <String > set = new LinkedHashSet <String >();
181
184
set .add (bag .poll (1 , TimeUnit .SECONDS ));
182
185
set .add (bag .poll (1 , TimeUnit .SECONDS ));
183
186
184
- System .out .println (set );
185
-
186
187
assertFalse (set .contains (payload1 ));
187
188
assertFalse (set .contains (payload2 ));
188
189
189
190
assertTrue (set .contains (anotherPayload1 ));
190
191
assertTrue (set .contains (anotherPayload2 ));
191
192
}
193
+
194
+ private class MessageHandler {
195
+ private final BlockingDeque <String > bag ;
196
+ private final String name ;
197
+
198
+ public MessageHandler (String name , BlockingDeque <String > bag ) {
199
+ this .bag = bag ;
200
+ this .name = name ;
201
+ }
202
+ public void handleMessage (String message ) {
203
+ System .out .println (name + ": " + message );
204
+ bag .add (message );
205
+ }
206
+ }
192
207
}
0 commit comments