@@ -439,6 +439,18 @@ public class StreamPerfTest implements Callable<Integer> {
439
439
@ ArgGroup (exclusive = false , multiplicity = "0..1" )
440
440
InstanceSyncOptions instanceSyncOptions ;
441
441
442
+ @ CommandLine .Option (
443
+ names = {"--filter-value-set" , "-fvs" },
444
+ description = "filter value set for publishers, range (e.g. 1..15) are accepted" ,
445
+ converter = Utils .FilterValueSetConverter .class )
446
+ private List <String > filterValueSet ;
447
+
448
+ @ CommandLine .Option (
449
+ names = {"--filter-values" , "-fv" },
450
+ description = "filter values for consumers" ,
451
+ split = "," )
452
+ private List <String > filterValues ;
453
+
442
454
static class InstanceSyncOptions {
443
455
444
456
@ CommandLine .Option (
@@ -589,7 +601,6 @@ public Integer call() throws Exception {
589
601
maybeDisplayVersion ();
590
602
maybeDisplayEnvironmentVariablesHelp ();
591
603
overridePropertiesWithEnvironmentVariables ();
592
-
593
604
Codec codec = createCodec (this .codecClass );
594
605
595
606
ByteBufAllocator byteBufAllocator = ByteBufAllocator .DEFAULT ;
@@ -876,19 +887,43 @@ public Integer call() throws Exception {
876
887
producerBuilder .name (producerName ).confirmTimeout (Duration .ZERO );
877
888
}
878
889
879
- java .util .function .Consumer <MessageBuilder > messageBuilderConsumer ;
890
+ java .util .function .Consumer <MessageBuilder > messageBuilderConsumerTemp ;
880
891
if (this .superStreams ) {
881
892
producerBuilder
882
893
.superStream (stream )
883
894
.routing (msg -> msg .getProperties ().getMessageIdAsString ());
884
895
AtomicLong messageIdSequence = new AtomicLong (0 );
885
- messageBuilderConsumer =
896
+ messageBuilderConsumerTemp =
886
897
mg -> mg .properties ().messageId (messageIdSequence .getAndIncrement ());
887
898
} else {
888
- messageBuilderConsumer = mg -> {};
899
+ messageBuilderConsumerTemp = mg -> {};
889
900
producerBuilder .stream (stream );
890
901
}
891
902
903
+ if (this .filterValueSet != null && this .filterValueSet .size () > 0 ) {
904
+ producerBuilder =
905
+ producerBuilder .filterValue (msg -> msg .getProperties ().getTo ());
906
+ List <String > values = new ArrayList <>(this .filterValueSet );
907
+ AtomicInteger count = new AtomicInteger ();
908
+ int subSetSize = Utils .filteringSubSetSize (values .size ());
909
+ int messageCountCycle = Utils .filteringPublishingCycle (this .rate );
910
+ List <String > subSet = new ArrayList <>(subSetSize );
911
+ java .util .function .Consumer <MessageBuilder > filteringMessageBuilderConsumer =
912
+ b -> {
913
+ if (Integer .remainderUnsigned (
914
+ count .getAndIncrement (), messageCountCycle )
915
+ == 0 ) {
916
+ Collections .shuffle (values );
917
+ subSet .clear ();
918
+ subSet .addAll (values .subList (0 , subSetSize ));
919
+ }
920
+ b .properties ()
921
+ .to (subSet .get (Integer .remainderUnsigned (count .get (), subSetSize )));
922
+ };
923
+ messageBuilderConsumerTemp =
924
+ messageBuilderConsumerTemp .andThen (filteringMessageBuilderConsumer );
925
+ }
926
+
892
927
Producer producer =
893
928
producerBuilder
894
929
.subEntrySize (this .subEntrySize )
@@ -898,9 +933,9 @@ public Integer call() throws Exception {
898
933
.maxUnconfirmedMessages (this .confirms )
899
934
.build ();
900
935
901
- AtomicLong messageCount = new AtomicLong (0 );
902
936
ConfirmationHandler confirmationHandler ;
903
937
if (this .confirmLatency ) {
938
+ AtomicLong messageCount = new AtomicLong (0 );
904
939
final PerformanceMetrics metrics = this .performanceMetrics ;
905
940
final int divisor = Utils .downSamplingDivisor (this .rate );
906
941
confirmationHandler =
@@ -936,6 +971,8 @@ public Integer call() throws Exception {
936
971
937
972
producers .add (producer );
938
973
974
+ java .util .function .Consumer <MessageBuilder > messageBuilderConsumer =
975
+ messageBuilderConsumerTemp ;
939
976
return (Runnable )
940
977
() -> {
941
978
final int msgSize = this .messageSize ;
@@ -1046,6 +1083,8 @@ public Integer call() throws Exception {
1046
1083
}
1047
1084
});
1048
1085
1086
+ consumerBuilder = maybeConfigureForFiltering (consumerBuilder );
1087
+
1049
1088
Consumer consumer = consumerBuilder .build ();
1050
1089
return consumer ;
1051
1090
})
@@ -1122,6 +1161,37 @@ public Integer call() throws Exception {
1122
1161
return 0 ;
1123
1162
}
1124
1163
1164
+ private ConsumerBuilder maybeConfigureForFiltering (ConsumerBuilder consumerBuilder ) {
1165
+ if (this .filterValues != null && this .filterValues .size () > 0 ) {
1166
+ consumerBuilder =
1167
+ consumerBuilder .filter ().values (this .filterValues .toArray (new String [0 ])).builder ();
1168
+
1169
+ if (this .filterValues .size () == 1 ) {
1170
+ String filterValue = filterValues .get (0 );
1171
+ consumerBuilder =
1172
+ consumerBuilder
1173
+ .filter ()
1174
+ .postFilter (msg -> filterValue .equals (msg .getProperties ().getTo ()))
1175
+ .builder ();
1176
+ } else {
1177
+ consumerBuilder =
1178
+ consumerBuilder
1179
+ .filter ()
1180
+ .postFilter (
1181
+ msg -> {
1182
+ for (String filterValue : this .filterValues ) {
1183
+ if (filterValue .equals (msg .getProperties ().getTo ())) {
1184
+ return true ;
1185
+ }
1186
+ }
1187
+ return false ;
1188
+ })
1189
+ .builder ();
1190
+ }
1191
+ }
1192
+ return consumerBuilder ;
1193
+ }
1194
+
1125
1195
private void createStream (Environment environment , String stream ) {
1126
1196
StreamCreator streamCreator =
1127
1197
environment .streamCreator ().stream (stream )
0 commit comments