@@ -180,72 +180,84 @@ static RetentionTestConfig[] retention() {
180
180
@ ParameterizedTest
181
181
@ MethodSource
182
182
void retention (RetentionTestConfig configuration , TestInfo info ) throws Exception {
183
- String testStream = streamName (info );
183
+ // this test is flaky in some CI environments, so we have to retry it
184
+ int attemptCount = 0 ;
185
+ int maxAttempts = 3 ;
186
+ while (attemptCount <= maxAttempts ) {
187
+ attemptCount ++;
188
+ String testStream = streamName (info );
184
189
185
- Client client = cf .get ();
186
- try {
187
- configuration .streamCreator .accept (new Object [] {client , testStream });
190
+ Client client = cf .get ();
191
+ try {
192
+ configuration .streamCreator .accept (new Object [] {client , testStream });
188
193
189
- AtomicLong publishSequence = new AtomicLong (0 );
190
- Runnable publish =
191
- () -> {
192
- byte [] payload = new byte [payloadSize ];
193
- TestUtils .publishAndWaitForConfirms (
194
- cf ,
195
- messageBuilder ->
196
- messageBuilder
197
- .properties ()
198
- .messageId (publishSequence .getAndIncrement ())
199
- .messageBuilder ()
200
- .addData (payload )
201
- .build (),
202
- messageCount ,
203
- testStream ,
204
- Duration .ofSeconds (20 ));
205
- };
194
+ AtomicLong publishSequence = new AtomicLong (0 );
195
+ Runnable publish =
196
+ () -> {
197
+ byte [] payload = new byte [payloadSize ];
198
+ TestUtils .publishAndWaitForConfirms (
199
+ cf ,
200
+ messageBuilder ->
201
+ messageBuilder
202
+ .properties ()
203
+ .messageId (publishSequence .getAndIncrement ())
204
+ .messageBuilder ()
205
+ .addData (payload )
206
+ .build (),
207
+ messageCount ,
208
+ testStream ,
209
+ Duration .ofSeconds (20 ));
210
+ };
206
211
207
- publish .run ();
208
- configuration .waiting ();
209
- // publishing again, to make sure new segments trigger retention strategy
210
- publish .run ();
212
+ publish .run ();
213
+ configuration .waiting ();
214
+ // publishing again, to make sure new segments trigger retention strategy
215
+ publish .run ();
211
216
212
- CountDownLatch consumingLatch = new CountDownLatch (1 );
213
- AtomicLong firstMessageId = new AtomicLong (-1 );
214
- AtomicLong lastMessageId = new AtomicLong (-1 );
215
- Client consumer =
216
- cf .get (
217
- new Client .ClientParameters ()
218
- .chunkListener (
219
- (client1 , subscriptionId , offset , messageCount1 , dataSize ) ->
220
- client1 .credit (subscriptionId , 1 ))
221
- .messageListener (
222
- (subscriptionId , offset , chunkTimestamp , committedOffset , message ) -> {
223
- long messageId = message .getProperties ().getMessageIdAsLong ();
224
- firstMessageId .compareAndSet (-1 , messageId );
225
- lastMessageId .set (messageId );
226
- if (messageId == publishSequence .get () - 1 ) {
227
- consumingLatch .countDown ();
228
- }
229
- }));
217
+ CountDownLatch consumingLatch = new CountDownLatch (1 );
218
+ AtomicLong firstMessageId = new AtomicLong (-1 );
219
+ AtomicLong lastMessageId = new AtomicLong (-1 );
220
+ Client consumer =
221
+ cf .get (
222
+ new Client .ClientParameters ()
223
+ .chunkListener (
224
+ (client1 , subscriptionId , offset , messageCount1 , dataSize ) ->
225
+ client1 .credit (subscriptionId , 1 ))
226
+ .messageListener (
227
+ (subscriptionId , offset , chunkTimestamp , committedOffset , message ) -> {
228
+ long messageId = message .getProperties ().getMessageIdAsLong ();
229
+ firstMessageId .compareAndSet (-1 , messageId );
230
+ lastMessageId .set (messageId );
231
+ if (messageId == publishSequence .get () - 1 ) {
232
+ consumingLatch .countDown ();
233
+ }
234
+ }));
230
235
231
- consumer .subscribe (b (1 ), testStream , OffsetSpecification .first (), 10 );
232
- assertThat (consumingLatch .await (10 , SECONDS ))
233
- .as (
234
- () ->
235
- String .format (
236
- "Failure '%s', first message ID %d, last message ID %d, publish sequence %d" ,
237
- configuration .description ,
238
- firstMessageId .get (),
239
- lastMessageId .get (),
240
- publishSequence .get ()))
241
- .isTrue ();
242
- consumer .unsubscribe (b (1 ));
243
- assertThat (configuration .firstMessageIdAssertion .test (firstMessageId .get ()))
244
- .as (configuration .assertionDescription .apply (firstMessageId .get ()))
245
- .isTrue ();
246
- } finally {
247
- client .delete (testStream );
248
- configuration .clean ();
236
+ consumer .subscribe (b (1 ), testStream , OffsetSpecification .first (), 10 );
237
+ assertThat (consumingLatch .await (10 , SECONDS ))
238
+ .as (
239
+ () ->
240
+ String .format (
241
+ "Failure '%s', first message ID %d, last message ID %d, publish sequence %d" ,
242
+ configuration .description ,
243
+ firstMessageId .get (),
244
+ lastMessageId .get (),
245
+ publishSequence .get ()))
246
+ .isTrue ();
247
+ consumer .unsubscribe (b (1 ));
248
+ assertThat (configuration .firstMessageIdAssertion .test (firstMessageId .get ()))
249
+ .as (configuration .assertionDescription .apply (firstMessageId .get ()))
250
+ .isTrue ();
251
+ attemptCount = Integer .MAX_VALUE ;
252
+ } catch (AssertionError e ) {
253
+ // if too many attempts, fail the test, otherwise, try again
254
+ if (attemptCount > maxAttempts ) {
255
+ throw e ;
256
+ }
257
+ } finally {
258
+ client .delete (testStream );
259
+ configuration .clean ();
260
+ }
249
261
}
250
262
}
251
263
0 commit comments