19
19
import static com .rabbitmq .stream .impl .TestUtils .latchAssert ;
20
20
import static com .rabbitmq .stream .impl .TestUtils .localhost ;
21
21
import static com .rabbitmq .stream .impl .TestUtils .waitAtMost ;
22
+ import static java .util .stream .Collectors .toList ;
22
23
import static org .assertj .core .api .Assertions .assertThat ;
23
24
24
25
import com .rabbitmq .client .Connection ;
37
38
import com .rabbitmq .stream .impl .Utils .CompositeConsumerUpdateListener ;
38
39
import io .netty .channel .EventLoopGroup ;
39
40
import java .nio .charset .StandardCharsets ;
41
+ import java .time .Duration ;
40
42
import java .util .Collections ;
41
43
import java .util .List ;
42
44
import java .util .Map ;
45
47
import java .util .concurrent .CountDownLatch ;
46
48
import java .util .concurrent .atomic .AtomicInteger ;
47
49
import java .util .function .Function ;
48
- import java .util .stream .Collectors ;
49
50
import java .util .stream .IntStream ;
50
51
import org .junit .jupiter .api .AfterEach ;
51
52
import org .junit .jupiter .api .BeforeEach ;
@@ -275,9 +276,7 @@ void autoOffsetTrackingShouldStoreOnAllPartitions() throws Exception {
275
276
void sacShouldSpreadAcrossPartitions () throws Exception {
276
277
declareSuperStreamTopology (connection , superStream , partitionCount );
277
278
List <String > partitions =
278
- IntStream .range (0 , partitionCount )
279
- .mapToObj (i -> superStream + "-" + i )
280
- .collect (Collectors .toList ());
279
+ IntStream .range (0 , partitionCount ).mapToObj (i -> superStream + "-" + i ).collect (toList ());
281
280
Map <String , Status > consumerStates = new ConcurrentHashMap <>();
282
281
String consumerName = "my-app" ;
283
282
Function <String , Consumer > consumerCreator =
@@ -369,9 +368,7 @@ void sacAutoOffsetTrackingShouldStoreOnRelanbancing() throws Exception {
369
368
AtomicInteger messageWaveCount = new AtomicInteger ();
370
369
int superConsumerCount = 3 ;
371
370
List <String > partitions =
372
- IntStream .range (0 , partitionCount )
373
- .mapToObj (i -> superStream + "-" + i )
374
- .collect (Collectors .toList ());
371
+ IntStream .range (0 , partitionCount ).mapToObj (i -> superStream + "-" + i ).collect (toList ());
375
372
Map <String , Boolean > consumerStates =
376
373
new ConcurrentHashMap <>(partitionCount * superConsumerCount );
377
374
Map <String , AtomicInteger > receivedMessages =
@@ -546,7 +543,7 @@ void sacAutoOffsetTrackingShouldStoreOnRelanbancing() throws Exception {
546
543
partitions .forEach (
547
544
partition ->
548
545
assertThat (lastReceivedOffsets .get (partition ))
549
- .isGreaterThanOrEqualTo (expectedMessageCountPerPartition )
546
+ .isGreaterThanOrEqualTo (expectedMessageCountPerPartition - 1 )
550
547
.isEqualTo (c .queryOffset (consumerName , partition ).getOffset ()));
551
548
}
552
549
@@ -559,9 +556,7 @@ void sacManualOffsetTrackingShouldTakeOverOnRelanbancing() throws Exception {
559
556
AtomicInteger messageWaveCount = new AtomicInteger ();
560
557
int superConsumerCount = 3 ;
561
558
List <String > partitions =
562
- IntStream .range (0 , partitionCount )
563
- .mapToObj (i -> superStream + "-" + i )
564
- .collect (Collectors .toList ());
559
+ IntStream .range (0 , partitionCount ).mapToObj (i -> superStream + "-" + i ).collect (toList ());
565
560
Map <String , Boolean > consumerStates =
566
561
new ConcurrentHashMap <>(partitionCount * superConsumerCount );
567
562
Map <String , AtomicInteger > receivedMessages =
@@ -627,7 +622,7 @@ void sacManualOffsetTrackingShouldTakeOverOnRelanbancing() throws Exception {
627
622
.superStream (superStream )
628
623
.offset (initialOffsetSpecification )
629
624
.name (consumerName )
630
- .autoTrackingStrategy ()
625
+ .manualTrackingStrategy ()
631
626
.builder ()
632
627
.consumerUpdateListener (consumerUpdateListener )
633
628
.messageHandler (
@@ -758,7 +753,182 @@ void sacManualOffsetTrackingShouldTakeOverOnRelanbancing() throws Exception {
758
753
partitions .forEach (
759
754
partition ->
760
755
assertThat (lastReceivedOffsets .get (partition ))
761
- .isGreaterThanOrEqualTo (expectedMessageCountPerPartition )
756
+ .isGreaterThanOrEqualTo (expectedMessageCountPerPartition - 1 )
762
757
.isEqualTo (c .queryOffset (consumerName , partition ).getOffset ()));
763
758
}
759
+
760
+ @ Test
761
+ @ SingleActiveConsumer
762
+ void sacCustomOffsetTrackingShouldTakeOverOnRelanbancing () throws Exception {
763
+ declareSuperStreamTopology (connection , superStream , partitionCount );
764
+ int messageCount = 5_000 ;
765
+ AtomicInteger messageWaveCount = new AtomicInteger ();
766
+ int superConsumerCount = 3 ;
767
+ List <String > partitions =
768
+ IntStream .range (0 , partitionCount ).mapToObj (i -> superStream + "-" + i ).collect (toList ());
769
+ Map <String , Boolean > consumerStates =
770
+ new ConcurrentHashMap <>(partitionCount * superConsumerCount );
771
+ Map <String , AtomicInteger > receivedMessages =
772
+ new ConcurrentHashMap <>(partitionCount * superConsumerCount );
773
+ Map <String , AtomicInteger > receivedMessagesPerPartitions =
774
+ new ConcurrentHashMap <>(partitionCount );
775
+ Map <String , Map <String , Consumer >> consumers = new ConcurrentHashMap <>(superConsumerCount );
776
+ IntStream .range (0 , superConsumerCount )
777
+ .mapToObj (String ::valueOf )
778
+ .forEach (
779
+ consumer ->
780
+ partitions .forEach (
781
+ partition -> {
782
+ consumers .put (consumer , new ConcurrentHashMap <>(partitionCount ));
783
+ consumerStates .put (consumer + partition , false );
784
+ receivedMessages .put (consumer + partition , new AtomicInteger (0 ));
785
+ }));
786
+ Map <String , Long > lastReceivedOffsets = new ConcurrentHashMap <>();
787
+ partitions .forEach (
788
+ partition -> {
789
+ lastReceivedOffsets .put (partition , 0L );
790
+ receivedMessagesPerPartitions .put (partition , new AtomicInteger (0 ));
791
+ });
792
+ Runnable publishOnAllPartitions =
793
+ () -> {
794
+ partitions .forEach (
795
+ partition -> TestUtils .publishAndWaitForConfirms (cf , messageCount , partition ));
796
+ messageWaveCount .incrementAndGet ();
797
+ };
798
+ String consumerName = "my-app" ;
799
+
800
+ OffsetSpecification initialOffsetSpecification = OffsetSpecification .first ();
801
+ Function <String , Consumer > consumerCreator =
802
+ consumer -> {
803
+ ConsumerUpdateListener consumerUpdateListener =
804
+ context -> {
805
+ consumers .get (consumer ).putIfAbsent (context .stream (), context .consumer ());
806
+ consumerStates .put (consumer + context .stream (), context .status () == Status .ACTIVE );
807
+ OffsetSpecification offsetSpecification = null ;
808
+ if (context .status () == Status .ACTIVE ) {
809
+ Long lastReceivedOffset = lastReceivedOffsets .get (context .stream ());
810
+ offsetSpecification =
811
+ lastReceivedOffset == null
812
+ ? initialOffsetSpecification
813
+ : OffsetSpecification .offset (lastReceivedOffset + 1 );
814
+ }
815
+ return offsetSpecification ;
816
+ };
817
+ return environment
818
+ .consumerBuilder ()
819
+ .singleActiveConsumer ()
820
+ .superStream (superStream )
821
+ .offset (initialOffsetSpecification )
822
+ .name (consumerName )
823
+ .manualTrackingStrategy ()
824
+ .checkInterval (Duration .ZERO ) // no background task for this consumer
825
+ .builder ()
826
+ .consumerUpdateListener (consumerUpdateListener )
827
+ .messageHandler (
828
+ (context , message ) -> {
829
+ lastReceivedOffsets .put (context .stream (), context .offset ());
830
+ receivedMessagesPerPartitions .get (context .stream ()).incrementAndGet ();
831
+ receivedMessages .get (consumer + context .stream ()).incrementAndGet ();
832
+ })
833
+ .build ();
834
+ };
835
+
836
+ Consumer consumer0 = consumerCreator .apply ("0" );
837
+ waitAtMost (
838
+ () ->
839
+ consumerStates .get ("0" + partitions .get (0 ))
840
+ && consumerStates .get ("0" + partitions .get (1 ))
841
+ && consumerStates .get ("0" + partitions .get (2 )));
842
+
843
+ publishOnAllPartitions .run ();
844
+
845
+ waitAtMost (
846
+ () ->
847
+ receivedMessages .get ("0" + partitions .get (0 )).get () == messageCount
848
+ && receivedMessages .get ("0" + partitions .get (1 )).get () == messageCount
849
+ && receivedMessages .get ("0" + partitions .get (2 )).get () == messageCount );
850
+
851
+ Consumer consumer1 = consumerCreator .apply ("1" );
852
+
853
+ waitAtMost (
854
+ () ->
855
+ consumerStates .get ("0" + partitions .get (0 ))
856
+ && consumerStates .get ("1" + partitions .get (1 ))
857
+ && consumerStates .get ("0" + partitions .get (2 )));
858
+
859
+ publishOnAllPartitions .run ();
860
+
861
+ waitAtMost (
862
+ () ->
863
+ receivedMessages .get ("0" + partitions .get (0 )).get () == messageCount * 2
864
+ && receivedMessages .get ("1" + partitions .get (1 )).get () == messageCount
865
+ && receivedMessages .get ("0" + partitions .get (2 )).get () == messageCount * 2 );
866
+
867
+ Consumer consumer2 = consumerCreator .apply ("2" );
868
+
869
+ waitAtMost (
870
+ () ->
871
+ consumerStates .get ("0" + partitions .get (0 ))
872
+ && consumerStates .get ("1" + partitions .get (1 ))
873
+ && consumerStates .get ("2" + partitions .get (2 )));
874
+
875
+ publishOnAllPartitions .run ();
876
+
877
+ waitAtMost (
878
+ () ->
879
+ receivedMessages .get ("0" + partitions .get (0 )).get () == messageCount * 3
880
+ && receivedMessages .get ("1" + partitions .get (1 )).get () == messageCount * 2
881
+ && receivedMessages .get ("2" + partitions .get (2 )).get () == messageCount );
882
+
883
+ consumer0 .close ();
884
+
885
+ waitAtMost (
886
+ () ->
887
+ consumerStates .get ("1" + partitions .get (0 ))
888
+ && consumerStates .get ("2" + partitions .get (1 ))
889
+ && consumerStates .get ("1" + partitions .get (2 )));
890
+
891
+ publishOnAllPartitions .run ();
892
+
893
+ waitAtMost (
894
+ () ->
895
+ receivedMessages .get ("1" + partitions .get (0 )).get () == messageCount
896
+ && receivedMessages .get ("2" + partitions .get (1 )).get () == messageCount
897
+ && receivedMessages .get ("1" + partitions .get (2 )).get () == messageCount );
898
+
899
+ consumer1 .close ();
900
+
901
+ waitAtMost (
902
+ () ->
903
+ consumerStates .get ("2" + partitions .get (0 ))
904
+ && consumerStates .get ("2" + partitions .get (1 ))
905
+ && consumerStates .get ("2" + partitions .get (2 )));
906
+
907
+ publishOnAllPartitions .run ();
908
+
909
+ waitAtMost (
910
+ () ->
911
+ receivedMessages .get ("2" + partitions .get (0 )).get () == messageCount
912
+ && receivedMessages .get ("2" + partitions .get (1 )).get () == messageCount * 2
913
+ && receivedMessages .get ("2" + partitions .get (2 )).get () == messageCount * 2 );
914
+
915
+ consumer2 .close ();
916
+
917
+ assertThat (messageWaveCount ).hasValue (5 );
918
+ assertThat (
919
+ receivedMessages .values ().stream ()
920
+ .map (AtomicInteger ::get )
921
+ .mapToInt (Integer ::intValue )
922
+ .sum ())
923
+ .isEqualTo (messageCount * partitionCount * 5 );
924
+ int expectedMessageCountPerPartition = messageCount * messageWaveCount .get ();
925
+ receivedMessagesPerPartitions
926
+ .values ()
927
+ .forEach (v -> assertThat (v ).hasValue (expectedMessageCountPerPartition ));
928
+ Client c = cf .get ();
929
+ partitions .forEach (
930
+ partition ->
931
+ assertThat (lastReceivedOffsets .get (partition ))
932
+ .isGreaterThanOrEqualTo (expectedMessageCountPerPartition - 1 ));
933
+ }
764
934
}
0 commit comments