18
18
19
19
import com .rabbitmq .stream .*;
20
20
import io .netty .channel .EventLoopGroup ;
21
+ import java .time .Duration ;
21
22
import java .util .*;
22
23
import java .util .concurrent .ConcurrentHashMap ;
23
24
import java .util .concurrent .CountDownLatch ;
37
38
@ ExtendWith (TestUtils .StreamTestInfrastructureExtension .class )
38
39
public class FilteringTest {
39
40
41
+ private static final Duration CONDITION_TIMEOUT = Duration .ofSeconds (5 );
42
+
40
43
static final int messageCount = 10_000 ;
41
44
42
45
EventLoopGroup eventLoopGroup ;
@@ -62,105 +65,122 @@ void tearDown() throws Exception {
62
65
@ ValueSource (strings = "foo" )
63
66
@ NullSource
64
67
void publishConsume (String producerName ) throws Exception {
65
- List <String > filterValues = new ArrayList <>(Arrays .asList ("apple" , "banana" , "pear" ));
66
- Map <String , AtomicInteger > filterValueCount = new HashMap <>();
67
- Random random = new Random ();
68
-
69
- Runnable insert =
70
- () ->
71
- publish (
72
- messageCount ,
73
- producerName ,
74
- () -> {
75
- String filterValue = filterValues .get (random .nextInt (filterValues .size ()));
76
- filterValueCount
77
- .computeIfAbsent (filterValue , k -> new AtomicInteger ())
78
- .incrementAndGet ();
79
- return filterValue ;
80
- });
81
- insert .run ();
82
-
83
- // second wave of messages, with only one, new filter value
84
- String newFilterValue = "orange" ;
85
- filterValues .clear ();
86
- filterValues .add (newFilterValue );
87
- insert .run ();
88
-
89
- AtomicInteger receivedMessageCount = new AtomicInteger (0 );
90
- AtomicInteger filteredConsumedMessageCount = new AtomicInteger (0 );
91
- consumerBuilder ()
92
- .filter ()
93
- .values (newFilterValue )
94
- .postFilter (
95
- m -> {
96
- receivedMessageCount .incrementAndGet ();
97
- return newFilterValue .equals (m .getProperties ().getGroupId ());
98
- })
99
- .builder ()
100
- .messageHandler ((context , message ) -> filteredConsumedMessageCount .incrementAndGet ())
101
- .build ();
102
-
103
- int expectedCount = filterValueCount .get (newFilterValue ).get ();
104
- waitAtMost (() -> filteredConsumedMessageCount .get () == expectedCount );
105
- assertThat (receivedMessageCount ).hasValueLessThan (messageCount * 2 );
68
+ repeatIfFailure (
69
+ () -> {
70
+ List <String > filterValues = new ArrayList <>(Arrays .asList ("apple" , "banana" , "pear" ));
71
+ Map <String , AtomicInteger > filterValueCount = new HashMap <>();
72
+ Random random = new Random ();
73
+
74
+ Runnable insert =
75
+ () ->
76
+ publish (
77
+ messageCount ,
78
+ producerName ,
79
+ () -> {
80
+ String filterValue = filterValues .get (random .nextInt (filterValues .size ()));
81
+ filterValueCount
82
+ .computeIfAbsent (filterValue , k -> new AtomicInteger ())
83
+ .incrementAndGet ();
84
+ return filterValue ;
85
+ });
86
+ insert .run ();
87
+
88
+ // second wave of messages, with only one, new filter value
89
+ String newFilterValue = "orange" ;
90
+ filterValues .clear ();
91
+ filterValues .add (newFilterValue );
92
+ insert .run ();
93
+
94
+ AtomicInteger receivedMessageCount = new AtomicInteger (0 );
95
+ AtomicInteger filteredConsumedMessageCount = new AtomicInteger (0 );
96
+ try (Consumer ignored =
97
+ consumerBuilder ()
98
+ .filter ()
99
+ .values (newFilterValue )
100
+ .postFilter (
101
+ m -> {
102
+ receivedMessageCount .incrementAndGet ();
103
+ return newFilterValue .equals (m .getProperties ().getGroupId ());
104
+ })
105
+ .builder ()
106
+ .messageHandler (
107
+ (context , message ) -> filteredConsumedMessageCount .incrementAndGet ())
108
+ .build ()) {
109
+ int expectedCount = filterValueCount .get (newFilterValue ).get ();
110
+ waitAtMost (
111
+ CONDITION_TIMEOUT , () -> filteredConsumedMessageCount .get () == expectedCount );
112
+ assertThat (receivedMessageCount ).hasValueLessThan (messageCount * 2 );
113
+ }
114
+ });
106
115
}
107
116
108
117
@ ParameterizedTest
109
118
@ ValueSource (strings = "foo" )
110
119
@ NullSource
111
- void publishWithNullFilterValuesShouldBePossible (String producerName ) {
112
- publish (messageCount , producerName , () -> null );
120
+ void publishWithNullFilterValuesShouldBePossible (String producerName ) throws Exception {
121
+ repeatIfFailure (
122
+ () -> {
123
+ publish (messageCount , producerName , () -> null );
113
124
114
- CountDownLatch consumeLatch = new CountDownLatch (messageCount );
115
- consumerBuilder ().messageHandler ((ctx , msg ) -> consumeLatch .countDown ()).build ();
116
- latchAssert (consumeLatch ).completes ();
125
+ CountDownLatch consumeLatch = new CountDownLatch (messageCount );
126
+ try (Consumer ignored =
127
+ consumerBuilder ().messageHandler ((ctx , msg ) -> consumeLatch .countDown ()).build ()) {
128
+ latchAssert (consumeLatch ).completes (CONDITION_TIMEOUT );
129
+ }
130
+ });
117
131
}
118
132
119
133
@ ParameterizedTest
120
134
@ CsvSource ({"foo,true" , "foo,false" , ",true" , ",false" })
121
135
void matchUnfilteredShouldReturnNullFilteredValueAndFilteredValues (
122
136
String producerName , boolean matchUnfiltered ) throws Exception {
123
- publish (messageCount , producerName , () -> null );
124
-
125
- List <String > filterValues = new ArrayList <>(Arrays .asList ("apple" , "banana" , "pear" ));
126
- Map <String , AtomicInteger > filterValueCount = new HashMap <>();
127
- Random random = new Random ();
128
- publish (
129
- messageCount ,
130
- producerName ,
137
+ repeatIfFailure (
131
138
() -> {
132
- String filterValue = filterValues .get (random .nextInt (filterValues .size ()));
133
- filterValueCount .computeIfAbsent (filterValue , k -> new AtomicInteger ()).incrementAndGet ();
134
- return filterValue ;
139
+ publish (messageCount , producerName , () -> null );
140
+
141
+ List <String > filterValues = new ArrayList <>(Arrays .asList ("apple" , "banana" , "pear" ));
142
+ Map <String , AtomicInteger > filterValueCount = new HashMap <>();
143
+ Random random = new Random ();
144
+ publish (
145
+ messageCount ,
146
+ producerName ,
147
+ () -> {
148
+ String filterValue = filterValues .get (random .nextInt (filterValues .size ()));
149
+ filterValueCount
150
+ .computeIfAbsent (filterValue , k -> new AtomicInteger ())
151
+ .incrementAndGet ();
152
+ return filterValue ;
153
+ });
154
+
155
+ publish (messageCount , producerName , () -> null );
156
+
157
+ AtomicInteger receivedMessageCount = new AtomicInteger (0 );
158
+ Set <String > receivedFilterValues = ConcurrentHashMap .newKeySet ();
159
+ try (Consumer ignored =
160
+ consumerBuilder ()
161
+ .filter ()
162
+ .values (filterValues .get (0 ))
163
+ .matchUnfiltered (matchUnfiltered )
164
+ .postFilter (m -> true )
165
+ .builder ()
166
+ .messageHandler (
167
+ (ctx , msg ) -> {
168
+ receivedFilterValues .add (
169
+ msg .getProperties ().getGroupId () == null
170
+ ? "null"
171
+ : msg .getProperties ().getGroupId ());
172
+ receivedMessageCount .incrementAndGet ();
173
+ })
174
+ .build ()) {
175
+ int expected ;
176
+ if (matchUnfiltered ) {
177
+ expected = messageCount * 2 ;
178
+ } else {
179
+ expected = messageCount ;
180
+ }
181
+ waitAtMost (CONDITION_TIMEOUT , () -> receivedMessageCount .get () >= expected );
182
+ }
135
183
});
136
-
137
- publish (messageCount , producerName , () -> null );
138
-
139
- AtomicInteger receivedMessageCount = new AtomicInteger (0 );
140
- Set <String > receivedFilterValues = ConcurrentHashMap .newKeySet ();
141
- consumerBuilder ()
142
- .filter ()
143
- .values (filterValues .get (0 ))
144
- .matchUnfiltered (matchUnfiltered )
145
- .postFilter (m -> true )
146
- .builder ()
147
- .messageHandler (
148
- (ctx , msg ) -> {
149
- receivedFilterValues .add (
150
- msg .getProperties ().getGroupId () == null
151
- ? "null"
152
- : msg .getProperties ().getGroupId ());
153
- receivedMessageCount .incrementAndGet ();
154
- })
155
- .build ();
156
-
157
- int expected ;
158
- if (matchUnfiltered ) {
159
- expected = messageCount * 2 ;
160
- } else {
161
- expected = messageCount ;
162
- }
163
- waitAtMost (() -> receivedMessageCount .get () >= expected );
164
184
}
165
185
166
186
private ProducerBuilder producerBuilder () {
@@ -194,7 +214,22 @@ private void publish(
194
214
.messageBuilder ()
195
215
.build (),
196
216
confirmationHandler ));
197
- latchAssert (latch ).completes ();
217
+ latchAssert (latch ).completes (CONDITION_TIMEOUT );
198
218
producer .close ();
199
219
}
220
+
221
+ private static void repeatIfFailure (RunnableWithException test ) throws Exception {
222
+ int executionCount = 0 ;
223
+ Exception lastException = null ;
224
+ while (executionCount < 5 ) {
225
+ try {
226
+ test .run ();
227
+ return ;
228
+ } catch (Exception e ) {
229
+ executionCount ++;
230
+ lastException = e ;
231
+ }
232
+ }
233
+ throw lastException ;
234
+ }
200
235
}
0 commit comments