@@ -627,14 +627,14 @@ void duplicatesWhenResubscribeAfterDisconnectionWithLongFlushInterval() throws E
627
627
AtomicInteger receivedMessages = new AtomicInteger (0 );
628
628
int storeEvery = 10_000 ;
629
629
String reference = "ref-1" ;
630
- CountDownLatch poisonLatch = new CountDownLatch ( 1 );
630
+ AtomicBoolean receivedPoison = new AtomicBoolean ( false );
631
631
environment .consumerBuilder ().name (reference ).stream (stream )
632
632
.offset (OffsetSpecification .first ())
633
633
.messageHandler (
634
634
(context , message ) -> {
635
635
receivedMessages .incrementAndGet ();
636
636
if ("poison" .equals (new String (message .getBodyAsBinary ()))) {
637
- poisonLatch . countDown ( );
637
+ receivedPoison . set ( true );
638
638
}
639
639
})
640
640
.autoTrackingStrategy ()
@@ -660,9 +660,15 @@ void duplicatesWhenResubscribeAfterDisconnectionWithLongFlushInterval() throws E
660
660
Host .killConnection ("rabbitmq-stream-consumer-0" );
661
661
662
662
publish .accept (storeEvery * 2 );
663
- producer .send (
664
- producer .messageBuilder ().addData ("poison" .getBytes ()).build (), confirmationStatus -> {});
665
- latchAssert (poisonLatch ).completes ();
663
+ waitAtMost (
664
+ () -> {
665
+ producer .send (
666
+ producer .messageBuilder ().addData ("poison" .getBytes ()).build (),
667
+ confirmationStatus -> {});
668
+ publishedMessages .incrementAndGet ();
669
+ return receivedPoison .get ();
670
+ });
671
+
666
672
// we have duplicates because the last stored value is behind and the re-subscription uses it
667
673
assertThat (receivedMessages ).hasValueGreaterThan (publishedMessages .get ());
668
674
}
0 commit comments