Skip to content

Commit 00c0112

Browse files
committed
Use server-side offset tracking in SAC test
Instead of client-side in-memory tracking, because it's more realistic. References rabbitmq/rabbitmq-server#3753 Conflicts: src/test/java/com/rabbitmq/stream/impl/SingleActiveConsumerTest.java
1 parent ba54cf3 commit 00c0112

File tree

1 file changed

+5
-3
lines changed

1 file changed

+5
-3
lines changed

src/test/java/com/rabbitmq/stream/impl/SingleActiveConsumerTest.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,15 @@ public class SingleActiveConsumerTest {
3838

3939
@Test
4040
void secondSubscriptionShouldTakeOverAfterFirstOneUnsubscribes() throws Exception {
41+
Client writerClient = cf.get();
4142
int messageCount = 10000;
4243
AtomicLong lastReceivedOffset = new AtomicLong(0);
4344
Map<Byte, Boolean> consumerStates = new ConcurrentHashMap<>();
4445
Map<Byte, AtomicInteger> receivedMessages = new ConcurrentHashMap<>();
4546
receivedMessages.put(b(0), new AtomicInteger(0));
4647
receivedMessages.put(b(1), new AtomicInteger(0));
4748
CountDownLatch consumerUpdateLatch = new CountDownLatch(2);
49+
String consumerName = "foo";
4850
ClientParameters clientParameters =
4951
new ClientParameters()
5052
.chunkListener(
@@ -59,17 +61,17 @@ void secondSubscriptionShouldTakeOverAfterFirstOneUnsubscribes() throws Exceptio
5961
(client, subscriptionId, active) -> {
6062
consumerStates.put(subscriptionId, active);
6163
consumerUpdateLatch.countDown();
62-
if (lastReceivedOffset.get() == 0) {
64+
long storedOffset = writerClient.queryOffset(consumerName, stream).getOffset();
65+
if (storedOffset == 0) {
6366
return OffsetSpecification.first();
6467
} else {
65-
return OffsetSpecification.offset(lastReceivedOffset.get() + 1);
68+
return OffsetSpecification.offset(storedOffset + 1);
6669
}
6770
});
6871
Client client = cf.get(clientParameters);
6972

7073
TestUtils.publishAndWaitForConfirms(cf, messageCount, stream);
7174

72-
String consumerName = "foo";
7375
Map<String, String> parameters = new HashMap<>();
7476
parameters.put("single-active-consumer", "true");
7577
parameters.put("name", consumerName);

0 commit comments

Comments
 (0)