16
16
17
17
import static com .rabbitmq .stream .impl .Assertions .assertThat ;
18
18
import static com .rabbitmq .stream .impl .LoadBalancerClusterTest .LOAD_BALANCER_ADDRESS ;
19
+ import static com .rabbitmq .stream .impl .TestUtils .newLoggerLevel ;
19
20
import static com .rabbitmq .stream .impl .TestUtils .sync ;
20
- import static io . vavr . Tuple . of ;
21
+ import static com . rabbitmq . stream . impl . Tuples . pair ;
21
22
import static java .time .Duration .ofSeconds ;
22
23
import static java .util .stream .Collectors .toList ;
23
24
import static java .util .stream .IntStream .range ;
24
25
import static org .assertj .core .api .Assertions .assertThat ;
25
26
26
27
import ch .qos .logback .classic .Level ;
28
+ import com .google .common .collect .Streams ;
27
29
import com .google .common .util .concurrent .RateLimiter ;
28
30
import com .rabbitmq .stream .*;
29
31
import com .rabbitmq .stream .impl .TestUtils .Sync ;
32
+ import com .rabbitmq .stream .impl .Tuples .Pair ;
30
33
import io .netty .channel .EventLoopGroup ;
31
- import io .vavr .Tuple2 ;
32
34
import java .nio .charset .StandardCharsets ;
33
35
import java .time .Duration ;
34
36
import java .util .LinkedHashMap ;
@@ -58,13 +60,14 @@ public class RecoveryClusterTest {
58
60
TestInfo testInfo ;
59
61
EventLoopGroup eventLoopGroup ;
60
62
EnvironmentBuilder environmentBuilder ;
61
- static Level producersCoordinatorLogLevel ;
63
+ static List <Level > logLevels ;
64
+ static List <Class <?>> logClasses =
65
+ List .of (ProducersCoordinator .class , ConsumersCoordinator .class , StreamEnvironment .class );
62
66
63
67
@ BeforeAll
64
68
static void initAll () {
65
69
nodes = Cli .nodes ();
66
- producersCoordinatorLogLevel =
67
- TestUtils .newLoggerLevel (ProducersCoordinator .class , Level .DEBUG );
70
+ logLevels = logClasses .stream ().map (c -> newLoggerLevel (c , Level .DEBUG )).collect (toList ());
68
71
}
69
72
70
73
@ BeforeEach
@@ -88,15 +91,16 @@ void tearDown() {
88
91
89
92
@ AfterAll
90
93
static void tearDownAll () {
91
- if (producersCoordinatorLogLevel != null ) {
92
- TestUtils .newLoggerLevel (ProducersCoordinator .class , producersCoordinatorLogLevel );
94
+ if (logLevels != null ) {
95
+ Streams .zip (logClasses .stream (), logLevels .stream (), Tuples ::pair )
96
+ .forEach (t -> newLoggerLevel (t .v1 (), t .v2 ()));
93
97
}
94
98
}
95
99
96
100
@ ParameterizedTest
97
101
@ CsvSource ({
98
- "false,false" ,
99
- "true,true" ,
102
+ // "false,false",
103
+ // "true,true",
100
104
"true,false" ,
101
105
})
102
106
void clusterRestart (boolean useLoadBalancer , boolean forceLeader ) throws InterruptedException {
@@ -167,17 +171,27 @@ void clusterRestart(boolean useLoadBalancer, boolean forceLeader) throws Interru
167
171
168
172
Thread .sleep (BACK_OFF_DELAY_POLICY .delay (0 ).multipliedBy (2 ).toMillis ());
169
173
170
- List <Tuple2 <String , Sync >> streamsSyncs =
171
- producers .stream ().map (p -> of (p .stream (), p .waitForNewMessages (1000 ))).collect (toList ());
174
+ List <Pair <String , Sync >> streamsSyncs =
175
+ producers .stream ()
176
+ .map (p -> pair (p .stream (), p .waitForNewMessages (1000 )))
177
+ .collect (toList ());
172
178
streamsSyncs .forEach (
173
- t -> {
174
- LOGGER .info ("Checking publisher to {} still publishes" , t . _1 ());
175
- assertThat (t . _2 ()).completes ();
176
- LOGGER .info ("Publisher to {} still publishes" , t . _1 ());
179
+ p -> {
180
+ LOGGER .info ("Checking publisher to {} still publishes" , p . v1 ());
181
+ assertThat (p . v2 ()).completes ();
182
+ LOGGER .info ("Publisher to {} still publishes" , p . v1 ());
177
183
});
178
184
179
- syncs = consumers .stream ().map (c -> c .waitForNewMessages (1000 )).collect (toList ());
180
- syncs .forEach (s -> assertThat (s ).completes ());
185
+ streamsSyncs =
186
+ consumers .stream ()
187
+ .map (c -> pair (c .stream (), c .waitForNewMessages (1000 )))
188
+ .collect (toList ());
189
+ streamsSyncs .forEach (
190
+ p -> {
191
+ LOGGER .info ("Checking consumer from {} still consumes" , p .v1 ());
192
+ assertThat (p .v2 ()).completes ();
193
+ LOGGER .info ("Consumer from {} still consumes" , p .v1 ());
194
+ });
181
195
182
196
Map <String , Long > committedChunkIdPerStream = new LinkedHashMap <>(streamCount );
183
197
streams .forEach (
@@ -271,11 +285,13 @@ public void close() {
271
285
272
286
private static class ConsumerState implements AutoCloseable {
273
287
288
+ private final String stream ;
274
289
private final Consumer consumer ;
275
290
final AtomicInteger receivedCount = new AtomicInteger ();
276
291
final AtomicReference <Runnable > postHandle = new AtomicReference <>(() -> {});
277
292
278
293
private ConsumerState (String stream , Environment environment ) {
294
+ this .stream = stream ;
279
295
this .consumer =
280
296
environment .consumerBuilder ().stream (stream )
281
297
.offset (OffsetSpecification .first ())
@@ -300,6 +316,10 @@ Sync waitForNewMessages(int messageCount) {
300
316
return sync ;
301
317
}
302
318
319
+ String stream () {
320
+ return this .stream ;
321
+ }
322
+
303
323
@ Override
304
324
public void close () {
305
325
this .consumer .close ();
0 commit comments