19
19
import static java .lang .String .format ;
20
20
21
21
import com .google .common .util .concurrent .RateLimiter ;
22
+ import com .rabbitmq .client .Channel ;
23
+ import com .rabbitmq .client .Connection ;
22
24
import com .rabbitmq .stream .Address ;
23
25
import com .rabbitmq .stream .AddressResolver ;
24
26
import com .rabbitmq .stream .ByteCapacity ;
31
33
import com .rabbitmq .stream .Environment ;
32
34
import com .rabbitmq .stream .EnvironmentBuilder ;
33
35
import com .rabbitmq .stream .EnvironmentBuilder .TlsConfiguration ;
36
+ import com .rabbitmq .stream .MessageBuilder ;
34
37
import com .rabbitmq .stream .OffsetSpecification ;
35
38
import com .rabbitmq .stream .Producer ;
36
39
import com .rabbitmq .stream .ProducerBuilder ;
76
79
import java .util .concurrent .TimeUnit ;
77
80
import java .util .concurrent .atomic .AtomicInteger ;
78
81
import java .util .concurrent .atomic .AtomicLong ;
82
+ import java .util .concurrent .atomic .AtomicReference ;
79
83
import java .util .function .BiFunction ;
80
84
import java .util .function .Function ;
81
85
import java .util .function .Supplier ;
@@ -366,6 +370,30 @@ public void setMaxSegmentSize(ByteCapacity in) {
366
370
defaultValue = "false" )
367
371
private boolean confirmLatency ;
368
372
373
+ @ CommandLine .Option (
374
+ names = {"--super-streams" , "-sst" },
375
+ description = "use super streams" ,
376
+ defaultValue = "false" )
377
+ private boolean superStreams ;
378
+
379
+ @ CommandLine .Option (
380
+ names = {"--super-streams-partitions" , "-ssp" },
381
+ description = "number of partitions for the super streams" ,
382
+ defaultValue = "3" ,
383
+ converter = Utils .PositiveIntegerTypeConverter .class )
384
+ private int superStreamsPartitions ;
385
+
386
+ @ CommandLine .Option (
387
+ names = {"--single-active-consumer" , "-sac" },
388
+ description = "use single active consumer" ,
389
+ defaultValue = "false" )
390
+ private boolean singleActiveConsumer ;
391
+
392
+ @ CommandLine .Option (
393
+ names = {"--amqp-uri" , "-au" },
394
+ description = "AMQP URI to use to create super stream topology" )
395
+ private String amqpUri ;
396
+
369
397
private MetricsCollector metricsCollector ;
370
398
private PerformanceMetrics performanceMetrics ;
371
399
private List <Monitoring > monitorings ;
@@ -622,33 +650,32 @@ public Integer call() throws Exception {
622
650
623
651
streams = Utils .streams (this .streamCount , this .streams );
624
652
625
- for (String stream : streams ) {
626
- StreamCreator streamCreator =
627
- environment .streamCreator ().stream (stream )
628
- .maxLengthBytes (this .maxLengthBytes )
629
- .maxSegmentSizeBytes (this .maxSegmentSize )
630
- .leaderLocator (this .leaderLocator );
631
-
632
- if (this .maxAge != null ) {
633
- streamCreator .maxAge (this .maxAge );
653
+ AtomicReference <Channel > amqpChannel = new AtomicReference <>();
654
+ Connection amqpConnection ;
655
+ if (this .superStreams ) {
656
+ amqpConnection = Utils .amqpConnection (this .amqpUri , uris , tls , this .sniServerNames );
657
+ if (this .deleteStreams ) {
658
+ // we keep it open for deletion, so adding a close step
659
+ shutdownService .wrap (
660
+ closeStep ("Closing AMQP connection for super streams" , () -> amqpConnection .close ()));
634
661
}
662
+ amqpChannel .set (amqpConnection .createChannel ());
663
+ } else {
664
+ amqpConnection = null ;
665
+ }
635
666
636
- try {
637
- streamCreator .create ();
638
- } catch (StreamException e ) {
639
- if (e .getCode () == Constants .RESPONSE_CODE_PRECONDITION_FAILED ) {
640
- String message =
641
- String .format (
642
- "Warning: stream '%s' already exists, but with different properties than "
643
- + "max-length-bytes=%s, stream-max-segment-size-bytes=%s, queue-leader-locator=%s" ,
644
- stream , this .maxLengthBytes , this .maxSegmentSize , this .leaderLocator );
645
- if (this .maxAge != null ) {
646
- message += String .format (", max-age=%s" , this .maxAge );
647
- }
648
- this .out .println (message );
649
- } else {
650
- throw e ;
667
+ for (String stream : streams ) {
668
+ if (this .superStreams ) {
669
+ List <String > partitions =
670
+ Utils .superStreamPartitions (stream , this .superStreamsPartitions );
671
+ for (String partition : partitions ) {
672
+ createStream (environment , partition );
651
673
}
674
+
675
+ Utils .declareSuperStreamExchangeAndBindings (amqpChannel .get (), stream , partitions );
676
+
677
+ } else {
678
+ createStream (environment , stream );
652
679
}
653
680
}
654
681
@@ -658,15 +685,30 @@ public Integer call() throws Exception {
658
685
"Deleting stream(s)" ,
659
686
() -> {
660
687
for (String stream : streams ) {
661
- LOGGER .debug ("Deleting {}" , stream );
662
- try {
663
- environment .deleteStream (stream );
664
- LOGGER .debug ("Deleted {}" , stream );
665
- } catch (Exception e ) {
666
- LOGGER .warn ("Could not delete stream {}: {}" , stream , e .getMessage ());
688
+ if (this .superStreams ) {
689
+ List <String > partitions =
690
+ Utils .superStreamPartitions (stream , this .superStreamsPartitions );
691
+ for (String partition : partitions ) {
692
+ environment .deleteStream (partition );
693
+ }
694
+ Utils .deleteSuperStreamExchange (amqpChannel .get (), stream );
695
+
696
+ } else {
697
+ LOGGER .debug ("Deleting {}" , stream );
698
+ try {
699
+ environment .deleteStream (stream );
700
+ LOGGER .debug ("Deleted {}" , stream );
701
+ } catch (Exception e ) {
702
+ LOGGER .warn ("Could not delete stream {}: {}" , stream , e .getMessage ());
703
+ }
667
704
}
668
705
}
669
706
}));
707
+ } else {
708
+ if (this .superStreams ) {
709
+ // we don't want to delete the super streams at the end, so we close the AMQP connection
710
+ amqpConnection .close ();
711
+ }
670
712
}
671
713
672
714
List <Producer > producers = Collections .synchronizedList (new ArrayList <>(this .producers ));
@@ -691,6 +733,16 @@ public Integer call() throws Exception {
691
733
producerBuilder .name (producerName ).confirmTimeout (Duration .ZERO );
692
734
}
693
735
736
+ java .util .function .Consumer <MessageBuilder > messageBuilderConsumer ;
737
+ if (this .superStreams ) {
738
+ producerBuilder .routing (msg -> msg .getProperties ().getMessageIdAsString ());
739
+ AtomicLong messageIdSequence = new AtomicLong (0 );
740
+ messageBuilderConsumer =
741
+ mg -> mg .properties ().messageId (messageIdSequence .getAndIncrement ());
742
+ } else {
743
+ messageBuilderConsumer = mg -> {};
744
+ }
745
+
694
746
Producer producer =
695
747
producerBuilder
696
748
.subEntrySize (this .subEntrySize )
@@ -753,9 +805,10 @@ public Integer call() throws Exception {
753
805
long creationTime = System .currentTimeMillis ();
754
806
byte [] payload = new byte [msgSize ];
755
807
Utils .writeLong (payload , creationTime );
808
+ MessageBuilder messageBuilder = producer .messageBuilder ();
809
+ messageBuilderConsumer .accept (messageBuilder );
756
810
producer .send (
757
- producer .messageBuilder ().addData (payload ).build (),
758
- confirmationHandler );
811
+ messageBuilder .addData (payload ).build (), confirmationHandler );
759
812
}
760
813
} catch (Exception e ) {
761
814
if (e instanceof InterruptedException
@@ -780,7 +833,21 @@ public Integer call() throws Exception {
780
833
AtomicLong messageCount = new AtomicLong (0 );
781
834
String stream = stream (streams , i );
782
835
ConsumerBuilder consumerBuilder = environment .consumerBuilder ();
783
- consumerBuilder = consumerBuilder .stream (stream ).offset (this .offset );
836
+ consumerBuilder = consumerBuilder .offset (this .offset );
837
+
838
+ if (this .superStreams ) {
839
+ consumerBuilder .superStream (stream );
840
+ } else {
841
+ consumerBuilder .stream (stream );
842
+ }
843
+
844
+ if (this .singleActiveConsumer ) {
845
+ consumerBuilder .singleActiveConsumer ();
846
+ // single active consumer requires a name
847
+ if (this .storeEvery == 0 ) {
848
+ this .storeEvery = 10_000 ;
849
+ }
850
+ }
784
851
785
852
if (this .storeEvery > 0 ) {
786
853
String consumerName = this .consumerNameStrategy .apply (stream , i + 1 );
@@ -880,6 +947,36 @@ public Integer call() throws Exception {
880
947
return 0 ;
881
948
}
882
949
950
+ private void createStream (Environment environment , String stream ) {
951
+ StreamCreator streamCreator =
952
+ environment .streamCreator ().stream (stream )
953
+ .maxLengthBytes (this .maxLengthBytes )
954
+ .maxSegmentSizeBytes (this .maxSegmentSize )
955
+ .leaderLocator (this .leaderLocator );
956
+
957
+ if (this .maxAge != null ) {
958
+ streamCreator .maxAge (this .maxAge );
959
+ }
960
+
961
+ try {
962
+ streamCreator .create ();
963
+ } catch (StreamException e ) {
964
+ if (e .getCode () == Constants .RESPONSE_CODE_PRECONDITION_FAILED ) {
965
+ String message =
966
+ String .format (
967
+ "Warning: stream '%s' already exists, but with different properties than "
968
+ + "max-length-bytes=%s, stream-max-segment-size-bytes=%s, queue-leader-locator=%s" ,
969
+ stream , this .maxLengthBytes , this .maxSegmentSize , this .leaderLocator );
970
+ if (this .maxAge != null ) {
971
+ message += String .format (", max-age=%s" , this .maxAge );
972
+ }
973
+ this .out .println (message );
974
+ } else {
975
+ throw e ;
976
+ }
977
+ }
978
+ }
979
+
883
980
private void overridePropertiesWithEnvironmentVariables () throws Exception {
884
981
Function <String , String > optionToEnvMappings =
885
982
OPTION_TO_ENVIRONMENT_VARIABLE
0 commit comments