Skip to content

Commit 08d63a4

Browse files
committed
Add test utils
1 parent 47c7909 commit 08d63a4

File tree

2 files changed

+29
-27
lines changed

2 files changed

+29
-27
lines changed

src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java

+2-23
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,7 @@
1313
1414
package com.rabbitmq.stream.impl;
1515

16-
import static com.rabbitmq.stream.impl.TestUtils.b;
17-
import static com.rabbitmq.stream.impl.TestUtils.latchAssert;
18-
import static com.rabbitmq.stream.impl.TestUtils.localhost;
19-
import static com.rabbitmq.stream.impl.TestUtils.streamName;
20-
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
21-
import static com.rabbitmq.stream.impl.TestUtils.waitMs;
16+
import static com.rabbitmq.stream.impl.TestUtils.*;
2217
import static java.lang.String.format;
2318
import static java.util.Collections.synchronizedList;
2419
import static org.assertj.core.api.Assertions.*;
@@ -227,23 +222,7 @@ void consumeWithAsyncConsumerFlowControl() throws Exception {
227222
Consumer consumer = consumerBuilder.build();
228223

229224
waitAtMost(() -> receivedMessageCount.get() >= processingLimit);
230-
int sameValueCount = 0;
231-
Duration timeout = Duration.ofSeconds(10);
232-
Duration waitTime = Duration.ofMillis(100);
233-
long waitedTime = 0;
234-
int lastValue = -1;
235-
while (sameValueCount < 10) {
236-
if (receivedMessageCount.get() == lastValue) {
237-
sameValueCount++;
238-
} else {
239-
lastValue = receivedMessageCount.get();
240-
}
241-
Thread.sleep(waitTime.toMillis());
242-
waitedTime += waitTime.toMillis();
243-
if (waitedTime > timeout.toMillis()) {
244-
fail("Consumption did not stop after %d seconds", timeout.getSeconds());
245-
}
246-
}
225+
waitUntilStable(receivedMessageCount::get);
247226

248227
assertThat(receivedMessageCount)
249228
.hasValueGreaterThanOrEqualTo(processingLimit)

src/test/java/com/rabbitmq/stream/impl/TestUtils.java

+27-4
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,10 @@
6363
import java.util.concurrent.TimeUnit;
6464
import java.util.concurrent.atomic.AtomicLong;
6565
import java.util.concurrent.atomic.AtomicReference;
66-
import java.util.function.Consumer;
67-
import java.util.function.Function;
68-
import java.util.function.Predicate;
69-
import java.util.function.Supplier;
66+
import java.util.function.*;
7067
import java.util.stream.IntStream;
7168
import org.assertj.core.api.AssertDelegateTarget;
69+
import org.assertj.core.api.Assertions;
7270
import org.assertj.core.api.Condition;
7371
import org.junit.jupiter.api.Tag;
7472
import org.junit.jupiter.api.TestInfo;
@@ -966,4 +964,29 @@ static Client.ChunkListener credit() {
966964
return null;
967965
};
968966
}
967+
968+
static void waitUntilStable(LongSupplier value) {
969+
int sameValueCount = 0;
970+
Duration timeout = Duration.ofSeconds(10);
971+
Duration waitTime = Duration.ofMillis(100);
972+
long waitedTime = 0;
973+
long lastValue = -1;
974+
while (sameValueCount < 10) {
975+
if (value.getAsLong() == lastValue) {
976+
sameValueCount++;
977+
} else {
978+
lastValue = value.getAsLong();
979+
}
980+
try {
981+
Thread.sleep(waitTime.toMillis());
982+
} catch (InterruptedException e) {
983+
Thread.currentThread().interrupt();
984+
throw new RuntimeException(e);
985+
}
986+
waitedTime += waitTime.toMillis();
987+
if (waitedTime > timeout.toMillis()) {
988+
Assertions.fail("Did not stabilize after %d seconds", timeout.getSeconds());
989+
}
990+
}
991+
}
969992
}

0 commit comments

Comments
 (0)