@@ -932,38 +932,50 @@ void streamStatsShouldReturnErrorWhenStreamDoesNotExist() {
932
932
@ Test
933
933
@ BrokerVersionAtLeast (BrokerVersion .RABBITMQ_3_11 )
934
934
void streamStatsFirstOffsetShouldChangeAfterRetentionKickedIn (TestInfo info ) {
935
- int messageCount = 1000 ;
936
- int payloadSize = 1000 ;
937
- String s = TestUtils .streamName (info );
938
- Client client = cf .get ();
939
- try {
940
- assertThat (
941
- client
942
- .create (
943
- s ,
944
- new Client .StreamParametersBuilder ()
945
- .maxLengthBytes (messageCount * payloadSize / 10 )
946
- .maxSegmentSizeBytes (messageCount * payloadSize / 20 )
947
- .build ())
948
- .isOk ())
949
- .isTrue ();
950
-
951
- StreamStatsResponse response = client .streamStats (s );
952
- assertThat (response .getInfo ()).containsEntry ("first_chunk_id" , -1L );
953
- assertThat (response .getInfo ()).containsEntry ("committed_chunk_id" , -1L );
954
-
955
- byte [] payload = new byte [payloadSize ];
956
- Function <MessageBuilder , Message > messageCreation = mb -> mb .addData (payload ).build ();
957
-
958
- TestUtils .publishAndWaitForConfirms (cf , messageCreation , messageCount , s );
959
- // publishing again, to make sure new segments trigger retention strategy
960
- TestUtils .publishAndWaitForConfirms (cf , messageCreation , messageCount , s );
961
- response = client .streamStats (s );
962
- assertThat (response .getInfo ().get ("first_chunk_id" )).isPositive ();
963
- assertThat (response .getInfo ().get ("committed_chunk_id" )).isPositive ();
964
-
965
- } finally {
966
- assertThat (client .delete (s ).isOk ()).isTrue ();
935
+ // this test is flaky in some CI environments, so we have to retry it
936
+ int attemptCount = 0 ;
937
+ int maxAttempts = 3 ;
938
+ while (attemptCount <= maxAttempts ) {
939
+ attemptCount ++;
940
+ int messageCount = 1000 ;
941
+ int payloadSize = 1000 ;
942
+ String s = TestUtils .streamName (info );
943
+ Client client = cf .get ();
944
+ try {
945
+ assertThat (
946
+ client
947
+ .create (
948
+ s ,
949
+ new Client .StreamParametersBuilder ()
950
+ .maxLengthBytes (messageCount * payloadSize / 10 )
951
+ .maxSegmentSizeBytes (messageCount * payloadSize / 20 )
952
+ .build ())
953
+ .isOk ())
954
+ .isTrue ();
955
+
956
+ StreamStatsResponse response = client .streamStats (s );
957
+ assertThat (response .getInfo ()).containsEntry ("first_chunk_id" , -1L );
958
+ assertThat (response .getInfo ()).containsEntry ("committed_chunk_id" , -1L );
959
+
960
+ byte [] payload = new byte [payloadSize ];
961
+ Function <MessageBuilder , Message > messageCreation = mb -> mb .addData (payload ).build ();
962
+
963
+ TestUtils .publishAndWaitForConfirms (cf , messageCreation , messageCount , s );
964
+ // publishing again, to make sure new segments trigger retention strategy
965
+ TestUtils .publishAndWaitForConfirms (cf , messageCreation , messageCount , s );
966
+ response = client .streamStats (s );
967
+ assertThat (response .getInfo ().get ("first_chunk_id" )).isPositive ();
968
+ assertThat (response .getInfo ().get ("committed_chunk_id" )).isPositive ();
969
+
970
+ attemptCount = Integer .MAX_VALUE ;
971
+ } catch (AssertionError e ) {
972
+ // if too many attempts, fail the test, otherwise, try again
973
+ if (attemptCount > maxAttempts ) {
974
+ throw e ;
975
+ }
976
+ } finally {
977
+ assertThat (client .delete (s ).isOk ()).isTrue ();
978
+ }
967
979
}
968
980
}
969
981
}
0 commit comments