@@ -94,6 +94,37 @@ private static void publishToPartitions(
94
94
latchAssert (publishLatch ).completes ();
95
95
}
96
96
97
+ static AutoCloseable publishToPartitions (TestUtils .ClientFactory cf , List <String > partitions ) {
98
+ Client client = cf .get ();
99
+ for (int i = 0 ; i < partitions .size (); i ++) {
100
+ assertThat (client .declarePublisher (b (i ), null , partitions .get (i )).isOk ()).isTrue ();
101
+ }
102
+ Runnable publish =
103
+ () -> {
104
+ int count = 0 ;
105
+ while (!Thread .currentThread ().isInterrupted ()) {
106
+ int partitionIndex = count ++ % partitions .size ();
107
+ String partition = partitions .get (partitionIndex );
108
+ client .publish (
109
+ b (partitionIndex ),
110
+ Collections .singletonList (
111
+ client
112
+ .messageBuilder ()
113
+ .addData (partition .getBytes (StandardCharsets .UTF_8 ))
114
+ .build ()));
115
+ try {
116
+ Thread .sleep (10 );
117
+ } catch (InterruptedException e ) {
118
+ Thread .currentThread ().interrupt ();
119
+ break ;
120
+ }
121
+ }
122
+ };
123
+ Thread thread = new Thread (publish );
124
+ thread .start ();
125
+ return thread ::interrupt ;
126
+ }
127
+
97
128
@ Test
98
129
void consumeAllMessagesFromAllPartitions () {
99
130
declareSuperStreamTopology (configurationClient , superStream , partitionCount );
@@ -299,62 +330,62 @@ void autoOffsetTrackingShouldStoreOffsetZero() {
299
330
@ BrokerVersionAtLeast (RABBITMQ_3_11_11 )
300
331
void rebalancedPartitionShouldGetMessagesWhenItComesBackToOriginalConsumerInstance ()
301
332
throws Exception {
333
+ Duration timeout = Duration .ofSeconds (60 );
302
334
declareSuperStreamTopology (configurationClient , superStream , partitionCount );
303
335
Client client = cf .get ();
304
336
List <String > partitions = client .partitions (superStream );
305
- int messageCount = 10_000 ;
306
- publishToPartitions (cf , partitions , messageCount );
307
- String consumerName = "my-app" ;
308
- Set <String > receivedPartitions = ConcurrentHashMap .newKeySet (partitionCount );
309
- Runnable processing =
310
- () -> {
311
- try {
312
- Thread .sleep (10 );
313
- } catch (InterruptedException e ) {
314
- // OK
315
- }
316
- };
317
- Consumer consumer1 =
318
- environment
319
- .consumerBuilder ()
320
- .superStream (superStream )
321
- .singleActiveConsumer ()
322
- .offset (OffsetSpecification .first ())
323
- .name (consumerName )
324
- .autoTrackingStrategy ()
325
- .messageCountBeforeStorage (messageCount / partitionCount / 50 )
326
- .builder ()
327
- .messageHandler (
328
- (context , message ) -> {
329
- receivedPartitions .add (context .stream ());
330
- processing .run ();
331
- })
332
- .build ();
333
- waitAtMost (() -> receivedPartitions .size () == partitions .size (),
334
- () -> format ("Expected to receive messages from all partitions, got %s" , receivedPartitions ));
337
+ try (AutoCloseable publish = publishToPartitions (cf , partitions )) {
338
+ int messageCountBeforeStorage = 10 ;
339
+ String consumerName = "my-app" ;
340
+ Set <String > receivedPartitions = ConcurrentHashMap .newKeySet (partitionCount );
341
+ Consumer consumer1 =
342
+ environment
343
+ .consumerBuilder ()
344
+ .superStream (superStream )
345
+ .singleActiveConsumer ()
346
+ .offset (OffsetSpecification .first ())
347
+ .name (consumerName )
348
+ .autoTrackingStrategy ()
349
+ .messageCountBeforeStorage (messageCountBeforeStorage )
350
+ .builder ()
351
+ .messageHandler (
352
+ (context , message ) -> {
353
+ receivedPartitions .add (context .stream ());
354
+ })
355
+ .build ();
356
+ waitAtMost (
357
+ timeout ,
358
+ () -> receivedPartitions .size () == partitions .size (),
359
+ () ->
360
+ format (
361
+ "Expected to receive messages from all partitions, got %s" , receivedPartitions ));
335
362
336
- AtomicReference <String > partition = new AtomicReference <>();
337
- Consumer consumer2 =
338
- environment
339
- .consumerBuilder ()
340
- .superStream (superStream )
341
- .singleActiveConsumer ()
342
- .offset (OffsetSpecification .first ())
343
- .name (consumerName )
344
- .autoTrackingStrategy ()
345
- .messageCountBeforeStorage (messageCount / partitionCount / 50 )
346
- .builder ()
347
- .messageHandler (
348
- (context , message ) -> {
349
- partition .set (context .stream ());
350
- processing .run ();
351
- })
352
- .build ();
353
- waitAtMost (() -> partition .get () != null );
354
- consumer2 .close ();
355
- receivedPartitions .clear ();
356
- waitAtMost (() -> receivedPartitions .size () == partitions .size (),
357
- () -> format ("Expected to receive messages from all partitions, got %s" , receivedPartitions ));
358
- consumer1 .close ();
363
+ AtomicReference <String > partition = new AtomicReference <>();
364
+ Consumer consumer2 =
365
+ environment
366
+ .consumerBuilder ()
367
+ .superStream (superStream )
368
+ .singleActiveConsumer ()
369
+ .offset (OffsetSpecification .first ())
370
+ .name (consumerName )
371
+ .autoTrackingStrategy ()
372
+ .messageCountBeforeStorage (messageCountBeforeStorage )
373
+ .builder ()
374
+ .messageHandler (
375
+ (context , message ) -> {
376
+ partition .set (context .stream ());
377
+ })
378
+ .build ();
379
+ waitAtMost (timeout , () -> partition .get () != null );
380
+ consumer2 .close ();
381
+ receivedPartitions .clear ();
382
+ waitAtMost (
383
+ timeout ,
384
+ () -> receivedPartitions .size () == partitions .size (),
385
+ () ->
386
+ format (
387
+ "Expected to receive messages from all partitions, got %s" , receivedPartitions ));
388
+ consumer1 .close ();
389
+ }
359
390
}
360
391
}
0 commit comments