@@ -431,6 +431,18 @@ public class StreamPerfTest implements Callable<Integer> {
431
431
@ ArgGroup (exclusive = false , multiplicity = "0..1" )
432
432
InstanceSyncOptions instanceSyncOptions ;
433
433
434
+ @ CommandLine .Option (
435
+ names = {"--filter-value-set" , "-fvs" },
436
+ description = "filter value set for publishers, range (e.g. 1..15) are accepted" ,
437
+ converter = Utils .FilterValueSetConverter .class )
438
+ private List <String > filterValueSet ;
439
+
440
+ @ CommandLine .Option (
441
+ names = {"--filter-values" , "-fv" },
442
+ description = "filter values for consumers" ,
443
+ split = "," )
444
+ private List <String > filterValues ;
445
+
434
446
static class InstanceSyncOptions {
435
447
436
448
@ CommandLine .Option (
@@ -588,7 +600,6 @@ public Integer call() throws Exception {
588
600
maybeDisplayVersion ();
589
601
maybeDisplayEnvironmentVariablesHelp ();
590
602
overridePropertiesWithEnvironmentVariables ();
591
-
592
603
Codec codec = createCodec (this .codecClass );
593
604
594
605
ByteBufAllocator byteBufAllocator = ByteBufAllocator .DEFAULT ;
@@ -875,19 +886,43 @@ public Integer call() throws Exception {
875
886
producerBuilder .name (producerName ).confirmTimeout (Duration .ZERO );
876
887
}
877
888
878
- java .util .function .Consumer <MessageBuilder > messageBuilderConsumer ;
889
+ java .util .function .Consumer <MessageBuilder > messageBuilderConsumerTemp ;
879
890
if (this .superStreams ) {
880
891
producerBuilder
881
892
.superStream (stream )
882
893
.routing (msg -> msg .getProperties ().getMessageIdAsString ());
883
894
AtomicLong messageIdSequence = new AtomicLong (0 );
884
- messageBuilderConsumer =
895
+ messageBuilderConsumerTemp =
885
896
mg -> mg .properties ().messageId (messageIdSequence .getAndIncrement ());
886
897
} else {
887
- messageBuilderConsumer = mg -> {};
898
+ messageBuilderConsumerTemp = mg -> {};
888
899
producerBuilder .stream (stream );
889
900
}
890
901
902
+ if (this .filterValueSet != null && this .filterValueSet .size () > 0 ) {
903
+ producerBuilder =
904
+ producerBuilder .filterValue (msg -> msg .getProperties ().getTo ());
905
+ List <String > values = new ArrayList <>(this .filterValueSet );
906
+ AtomicInteger count = new AtomicInteger ();
907
+ int subSetSize = Utils .filteringSubSetSize (values .size ());
908
+ int messageCountCycle = Utils .filteringPublishingCycle (this .rate );
909
+ List <String > subSet = new ArrayList <>(subSetSize );
910
+ java .util .function .Consumer <MessageBuilder > filteringMessageBuilderConsumer =
911
+ b -> {
912
+ if (Integer .remainderUnsigned (
913
+ count .getAndIncrement (), messageCountCycle )
914
+ == 0 ) {
915
+ Collections .shuffle (values );
916
+ subSet .clear ();
917
+ subSet .addAll (values .subList (0 , subSetSize ));
918
+ }
919
+ b .properties ()
920
+ .to (subSet .get (Integer .remainderUnsigned (count .get (), subSetSize )));
921
+ };
922
+ messageBuilderConsumerTemp =
923
+ messageBuilderConsumerTemp .andThen (filteringMessageBuilderConsumer );
924
+ }
925
+
891
926
Producer producer =
892
927
producerBuilder
893
928
.subEntrySize (this .subEntrySize )
@@ -897,9 +932,9 @@ public Integer call() throws Exception {
897
932
.maxUnconfirmedMessages (this .confirms )
898
933
.build ();
899
934
900
- AtomicLong messageCount = new AtomicLong (0 );
901
935
ConfirmationHandler confirmationHandler ;
902
936
if (this .confirmLatency ) {
937
+ AtomicLong messageCount = new AtomicLong (0 );
903
938
final PerformanceMetrics metrics = this .performanceMetrics ;
904
939
final int divisor = Utils .downSamplingDivisor (this .rate );
905
940
confirmationHandler =
@@ -935,6 +970,8 @@ public Integer call() throws Exception {
935
970
936
971
producers .add (producer );
937
972
973
+ java .util .function .Consumer <MessageBuilder > messageBuilderConsumer =
974
+ messageBuilderConsumerTemp ;
938
975
return (Runnable )
939
976
() -> {
940
977
final int msgSize = this .messageSize ;
@@ -1050,6 +1087,8 @@ public Integer call() throws Exception {
1050
1087
}
1051
1088
});
1052
1089
1090
+ consumerBuilder = maybeConfigureForFiltering (consumerBuilder );
1091
+
1053
1092
Consumer consumer = consumerBuilder .build ();
1054
1093
return consumer ;
1055
1094
})
@@ -1126,6 +1165,37 @@ public Integer call() throws Exception {
1126
1165
return 0 ;
1127
1166
}
1128
1167
1168
+ private ConsumerBuilder maybeConfigureForFiltering (ConsumerBuilder consumerBuilder ) {
1169
+ if (this .filterValues != null && this .filterValues .size () > 0 ) {
1170
+ consumerBuilder =
1171
+ consumerBuilder .filter ().values (this .filterValues .toArray (new String [0 ])).builder ();
1172
+
1173
+ if (this .filterValues .size () == 1 ) {
1174
+ String filterValue = filterValues .get (0 );
1175
+ consumerBuilder =
1176
+ consumerBuilder
1177
+ .filter ()
1178
+ .postFilter (msg -> filterValue .equals (msg .getProperties ().getTo ()))
1179
+ .builder ();
1180
+ } else {
1181
+ consumerBuilder =
1182
+ consumerBuilder
1183
+ .filter ()
1184
+ .postFilter (
1185
+ msg -> {
1186
+ for (String filterValue : this .filterValues ) {
1187
+ if (filterValue .equals (msg .getProperties ().getTo ())) {
1188
+ return true ;
1189
+ }
1190
+ }
1191
+ return false ;
1192
+ })
1193
+ .builder ();
1194
+ }
1195
+ }
1196
+ return consumerBuilder ;
1197
+ }
1198
+
1129
1199
private void createStream (Environment environment , String stream ) {
1130
1200
StreamCreator streamCreator =
1131
1201
environment .streamCreator ().stream (stream )
0 commit comments