25
25
import java .util .Collections ;
26
26
import java .util .concurrent .BlockingDeque ;
27
27
import java .util .concurrent .LinkedBlockingDeque ;
28
- import java .util .concurrent .Phaser ;
29
28
import java .util .concurrent .TimeUnit ;
30
29
31
30
import org .junit .jupiter .api .AfterEach ;
32
31
import org .junit .jupiter .api .BeforeEach ;
33
32
34
- import org .springframework .core .task .SimpleAsyncTaskExecutor ;
35
- import org .springframework .core .task .SyncTaskExecutor ;
36
33
import org .springframework .data .redis .ObjectFactory ;
37
34
import org .springframework .data .redis .connection .RedisConnectionFactory ;
38
35
import org .springframework .data .redis .connection .jedis .JedisConnectionFactory ;
@@ -82,33 +79,18 @@ public static Collection<Object[]> testParams() {
82
79
}
83
80
84
81
@ BeforeEach
85
- void setUp () throws Exception {
82
+ void setUp () {
86
83
bag .clear ();
87
84
88
85
adapter .setSerializer (template .getValueSerializer ());
89
86
adapter .afterPropertiesSet ();
90
87
91
- Phaser phaser = new Phaser (1 );
92
-
93
88
container = new RedisMessageListenerContainer ();
94
89
container .setConnectionFactory (template .getConnectionFactory ());
95
90
container .setBeanName ("container" );
96
91
container .addMessageListener (adapter , Arrays .asList (new ChannelTopic (CHANNEL )));
97
- container .setTaskExecutor (new SyncTaskExecutor ());
98
- container .setSubscriptionExecutor (new SimpleAsyncTaskExecutor () {
99
- @ Override
100
- protected void doExecute (Runnable task ) {
101
- super .doExecute (() -> {
102
- phaser .arriveAndDeregister ();
103
- task .run ();
104
- });
105
- }
106
- });
107
92
container .afterPropertiesSet ();
108
93
container .start ();
109
-
110
- phaser .arriveAndAwaitAdvance ();
111
- Thread .sleep (250 );
112
94
}
113
95
114
96
@ AfterEach
@@ -130,17 +112,18 @@ void testContainerSubscribe() {
130
112
T payload1 = getT ();
131
113
T payload2 = getT ();
132
114
133
- assertThat ( template .convertAndSend (CHANNEL , payload1 )). isEqualTo ( 1L );
134
- assertThat ( template .convertAndSend (CHANNEL , payload2 )). isEqualTo ( 1L );
115
+ template .convertAndSend (CHANNEL , payload1 );
116
+ template .convertAndSend (CHANNEL , payload2 );
135
117
136
118
await ().atMost (Duration .ofSeconds (2 )).until (() -> bag .contains (payload1 ) && bag .contains (payload2 ));
137
119
}
138
120
139
121
@ ParameterizedRedisTest
140
122
void testMessageBatch () throws Exception {
123
+
141
124
int COUNT = 10 ;
142
125
for (int i = 0 ; i < COUNT ; i ++) {
143
- assertThat ( template .convertAndSend (CHANNEL , getT ())). isEqualTo ( 1L );
126
+ template .convertAndSend (CHANNEL , getT ());
144
127
}
145
128
146
129
for (int i = 0 ; i < COUNT ; i ++) {
@@ -155,8 +138,8 @@ void testContainerUnsubscribe() throws Exception {
155
138
T payload2 = getT ();
156
139
157
140
container .removeMessageListener (adapter , new ChannelTopic (CHANNEL ));
158
- assertThat ( template .convertAndSend (CHANNEL , payload1 )). isEqualTo ( 1L );
159
- assertThat ( template .convertAndSend (CHANNEL , payload2 )). isEqualTo ( 1L );
141
+ template .convertAndSend (CHANNEL , payload1 );
142
+ template .convertAndSend (CHANNEL , payload2 );
160
143
161
144
assertThat (bag .poll (200 , TimeUnit .MILLISECONDS )).isNull ();
162
145
}
0 commit comments