19
19
import static java .lang .String .format ;
20
20
21
21
import com .google .common .util .concurrent .RateLimiter ;
22
+ import com .rabbitmq .client .Channel ;
23
+ import com .rabbitmq .client .Connection ;
22
24
import com .rabbitmq .stream .Address ;
23
25
import com .rabbitmq .stream .AddressResolver ;
24
26
import com .rabbitmq .stream .ByteCapacity ;
31
33
import com .rabbitmq .stream .Environment ;
32
34
import com .rabbitmq .stream .EnvironmentBuilder ;
33
35
import com .rabbitmq .stream .EnvironmentBuilder .TlsConfiguration ;
36
+ import com .rabbitmq .stream .MessageBuilder ;
34
37
import com .rabbitmq .stream .OffsetSpecification ;
35
38
import com .rabbitmq .stream .Producer ;
36
39
import com .rabbitmq .stream .ProducerBuilder ;
76
79
import java .util .concurrent .TimeUnit ;
77
80
import java .util .concurrent .atomic .AtomicInteger ;
78
81
import java .util .concurrent .atomic .AtomicLong ;
82
+ import java .util .concurrent .atomic .AtomicReference ;
79
83
import java .util .function .BiFunction ;
80
84
import java .util .function .Function ;
81
85
import java .util .function .Supplier ;
@@ -347,6 +351,30 @@ public class StreamPerfTest implements Callable<Integer> {
347
351
converter = Utils .PositiveIntegerTypeConverter .class )
348
352
private int rpcTimeout ;
349
353
354
+ @ CommandLine .Option (
355
+ names = {"--super-streams" , "-sst" },
356
+ description = "use super streams" ,
357
+ defaultValue = "false" )
358
+ private boolean superStreams ;
359
+
360
+ @ CommandLine .Option (
361
+ names = {"--super-streams-partitions" , "-ssp" },
362
+ description = "number of partitions for the super streams" ,
363
+ defaultValue = "3" ,
364
+ converter = Utils .PositiveIntegerTypeConverter .class )
365
+ private int superStreamsPartitions ;
366
+
367
+ @ CommandLine .Option (
368
+ names = {"--single-active-consumer" , "-sac" },
369
+ description = "use single active consumer" ,
370
+ defaultValue = "false" )
371
+ private boolean singleActiveConsumer ;
372
+
373
+ @ CommandLine .Option (
374
+ names = {"--amqp-uri" , "-au" },
375
+ description = "AMQP URI to use to create super stream topology" )
376
+ private String amqpUri ;
377
+
350
378
private MetricsCollector metricsCollector ;
351
379
private PerformanceMetrics performanceMetrics ;
352
380
private List <Monitoring > monitorings ;
@@ -601,33 +629,32 @@ public Integer call() throws Exception {
601
629
602
630
streams = Utils .streams (this .streamCount , this .streams );
603
631
604
- for (String stream : streams ) {
605
- StreamCreator streamCreator =
606
- environment .streamCreator ().stream (stream )
607
- .maxLengthBytes (this .maxLengthBytes )
608
- .maxSegmentSizeBytes (this .maxSegmentSize )
609
- .leaderLocator (this .leaderLocator );
610
-
611
- if (this .maxAge != null ) {
612
- streamCreator .maxAge (this .maxAge );
632
+ AtomicReference <Channel > amqpChannel = new AtomicReference <>();
633
+ Connection amqpConnection ;
634
+ if (this .superStreams ) {
635
+ amqpConnection = Utils .amqpConnection (this .amqpUri , uris , tls , this .sniServerNames );
636
+ if (this .deleteStreams ) {
637
+ // we keep it open for deletion, so adding a close step
638
+ shutdownService .wrap (
639
+ closeStep ("Closing AMQP connection for super streams" , () -> amqpConnection .close ()));
613
640
}
641
+ amqpChannel .set (amqpConnection .createChannel ());
642
+ } else {
643
+ amqpConnection = null ;
644
+ }
614
645
615
- try {
616
- streamCreator .create ();
617
- } catch (StreamException e ) {
618
- if (e .getCode () == Constants .RESPONSE_CODE_PRECONDITION_FAILED ) {
619
- String message =
620
- String .format (
621
- "Warning: stream '%s' already exists, but with different properties than "
622
- + "max-length-bytes=%s, stream-max-segment-size-bytes=%s, queue-leader-locator=%s" ,
623
- stream , this .maxLengthBytes , this .maxSegmentSize , this .leaderLocator );
624
- if (this .maxAge != null ) {
625
- message += String .format (", max-age=%s" , this .maxAge );
626
- }
627
- this .out .println (message );
628
- } else {
629
- throw e ;
646
+ for (String stream : streams ) {
647
+ if (this .superStreams ) {
648
+ List <String > partitions =
649
+ Utils .superStreamPartitions (stream , this .superStreamsPartitions );
650
+ for (String partition : partitions ) {
651
+ createStream (environment , partition );
630
652
}
653
+
654
+ Utils .declareSuperStreamExchangeAndBindings (amqpChannel .get (), stream , partitions );
655
+
656
+ } else {
657
+ createStream (environment , stream );
631
658
}
632
659
}
633
660
@@ -637,22 +664,32 @@ public Integer call() throws Exception {
637
664
"Deleting stream(s)" ,
638
665
() -> {
639
666
for (String stream : streams ) {
640
- LOGGER .debug ("Deleting {}" , stream );
641
- try {
642
- environment .deleteStream (stream );
643
- LOGGER .debug ("Deleted {}" , stream );
644
- } catch (Exception e ) {
645
- LOGGER .warn ("Could not delete stream {}: {}" , stream , e .getMessage ());
667
+ if (this .superStreams ) {
668
+ List <String > partitions =
669
+ Utils .superStreamPartitions (stream , this .superStreamsPartitions );
670
+ for (String partition : partitions ) {
671
+ environment .deleteStream (partition );
672
+ }
673
+ Utils .deleteSuperStreamExchange (amqpChannel .get (), stream );
674
+
675
+ } else {
676
+ LOGGER .debug ("Deleting {}" , stream );
677
+ try {
678
+ environment .deleteStream (stream );
679
+ LOGGER .debug ("Deleted {}" , stream );
680
+ } catch (Exception e ) {
681
+ LOGGER .warn ("Could not delete stream {}: {}" , stream , e .getMessage ());
682
+ }
646
683
}
647
684
}
648
685
}));
686
+ } else {
687
+ if (this .superStreams ) {
688
+ // we don't want to delete the super streams at the end, so we close the AMQP connection
689
+ amqpConnection .close ();
690
+ }
649
691
}
650
692
651
- // FIXME handle metadata update for consumers and publishers
652
- // they should at least issue a warning that their stream has been deleted and that they're
653
- // now
654
- // useless
655
-
656
693
List <Producer > producers = Collections .synchronizedList (new ArrayList <>(this .producers ));
657
694
List <Runnable > producerRunnables =
658
695
IntStream .range (0 , this .producers )
@@ -675,6 +712,16 @@ public Integer call() throws Exception {
675
712
producerBuilder .name (producerName ).confirmTimeout (Duration .ZERO );
676
713
}
677
714
715
+ java .util .function .Consumer <MessageBuilder > messageBuilderConsumer ;
716
+ if (this .superStreams ) {
717
+ producerBuilder .routing (msg -> msg .getProperties ().getMessageIdAsString ());
718
+ AtomicLong messageIdSequence = new AtomicLong (0 );
719
+ messageBuilderConsumer =
720
+ mg -> mg .properties ().messageId (messageIdSequence .getAndIncrement ());
721
+ } else {
722
+ messageBuilderConsumer = mg -> {};
723
+ }
724
+
678
725
Producer producer =
679
726
producerBuilder
680
727
.subEntrySize (this .subEntrySize )
@@ -707,9 +754,10 @@ public Integer call() throws Exception {
707
754
long creationTime = System .currentTimeMillis ();
708
755
byte [] payload = new byte [msgSize ];
709
756
Utils .writeLong (payload , creationTime );
757
+ MessageBuilder messageBuilder = producer .messageBuilder ();
758
+ messageBuilderConsumer .accept (messageBuilder );
710
759
producer .send (
711
- producer .messageBuilder ().addData (payload ).build (),
712
- confirmationHandler );
760
+ messageBuilder .addData (payload ).build (), confirmationHandler );
713
761
}
714
762
} catch (Exception e ) {
715
763
if (e instanceof InterruptedException
@@ -734,7 +782,21 @@ public Integer call() throws Exception {
734
782
AtomicLong messageCount = new AtomicLong (0 );
735
783
String stream = stream (streams , i );
736
784
ConsumerBuilder consumerBuilder = environment .consumerBuilder ();
737
- consumerBuilder = consumerBuilder .stream (stream ).offset (this .offset );
785
+ consumerBuilder = consumerBuilder .offset (this .offset );
786
+
787
+ if (this .superStreams ) {
788
+ consumerBuilder .superStream (stream );
789
+ } else {
790
+ consumerBuilder .stream (stream );
791
+ }
792
+
793
+ if (this .singleActiveConsumer ) {
794
+ consumerBuilder .singleActiveConsumer ();
795
+ // single active consumer requires a name
796
+ if (this .storeEvery == 0 ) {
797
+ this .storeEvery = 10_000 ;
798
+ }
799
+ }
738
800
739
801
if (this .storeEvery > 0 ) {
740
802
String consumerName = this .consumerNameStrategy .apply (stream , i + 1 );
@@ -831,6 +893,36 @@ public Integer call() throws Exception {
831
893
return 0 ;
832
894
}
833
895
896
+ private void createStream (Environment environment , String stream ) {
897
+ StreamCreator streamCreator =
898
+ environment .streamCreator ().stream (stream )
899
+ .maxLengthBytes (this .maxLengthBytes )
900
+ .maxSegmentSizeBytes (this .maxSegmentSize )
901
+ .leaderLocator (this .leaderLocator );
902
+
903
+ if (this .maxAge != null ) {
904
+ streamCreator .maxAge (this .maxAge );
905
+ }
906
+
907
+ try {
908
+ streamCreator .create ();
909
+ } catch (StreamException e ) {
910
+ if (e .getCode () == Constants .RESPONSE_CODE_PRECONDITION_FAILED ) {
911
+ String message =
912
+ String .format (
913
+ "Warning: stream '%s' already exists, but with different properties than "
914
+ + "max-length-bytes=%s, stream-max-segment-size-bytes=%s, queue-leader-locator=%s" ,
915
+ stream , this .maxLengthBytes , this .maxSegmentSize , this .leaderLocator );
916
+ if (this .maxAge != null ) {
917
+ message += String .format (", max-age=%s" , this .maxAge );
918
+ }
919
+ this .out .println (message );
920
+ } else {
921
+ throw e ;
922
+ }
923
+ }
924
+ }
925
+
834
926
private void overridePropertiesWithEnvironmentVariables () throws Exception {
835
927
Function <String , String > optionToEnvMappings =
836
928
OPTION_TO_ENVIRONMENT_VARIABLE
0 commit comments