Skip to content

Commit a840962

Browse files
committed
Add SAC super stream test with manual offset tracking
Using server-side offset tracking. References #46, rabbitmq/rabbitmq-server#3753
1 parent e5ae4a5 commit a840962

File tree

4 files changed

+273
-10
lines changed

4 files changed

+273
-10
lines changed

src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.rabbitmq.stream.Constants;
1818
import com.rabbitmq.stream.Consumer;
1919
import com.rabbitmq.stream.ConsumerUpdateListener;
20+
import com.rabbitmq.stream.ConsumerUpdateListener.Status;
2021
import com.rabbitmq.stream.MessageHandler;
2122
import com.rabbitmq.stream.MessageHandler.Context;
2223
import com.rabbitmq.stream.OffsetSpecification;
@@ -180,7 +181,7 @@ class StreamConsumer implements Consumer {
180181
}
181182
} else if (trackingConfiguration.manual()) {
182183
LOGGER.debug("Setting default consumer update listener for manual tracking strategy");
183-
this.consumerUpdateListener =
184+
ConsumerUpdateListener defaultListener =
184185
context -> {
185186
OffsetSpecification result = null;
186187
// we are not supposed to store offsets with manual tracking strategy
@@ -196,6 +197,7 @@ class StreamConsumer implements Consumer {
196197
}
197198
return result;
198199
};
200+
this.consumerUpdateListener = defaultListener;
199201
} else {
200202
// no consumer update listener to look up the offset, this is not what we want
201203
this.consumerUpdateListener = context -> null;

src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java

+20-7
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package com.rabbitmq.stream.impl;
1515

1616
import com.rabbitmq.stream.Consumer;
17+
import com.rabbitmq.stream.ConsumerUpdateListener.Status;
1718
import com.rabbitmq.stream.Message;
1819
import com.rabbitmq.stream.MessageHandler;
1920
import com.rabbitmq.stream.impl.StreamConsumerBuilder.TrackingConfiguration;
@@ -80,10 +81,14 @@ class SuperStreamConsumer implements Consumer {
8081
.builder();
8182
}
8283

83-
Consumer consumer =
84-
subConsumerBuilder.lazyInit(true).superStream(null).messageHandler(messageHandler).stream(
85-
partition)
86-
.build();
84+
StreamConsumer consumer =
85+
(StreamConsumer)
86+
subConsumerBuilder
87+
.lazyInit(true)
88+
.superStream(null)
89+
.messageHandler(messageHandler)
90+
.stream(partition)
91+
.build();
8792
consumers.put(partition, consumer);
8893
state.consumer = consumer;
8994
LOGGER.debug("Created consumer on stream '{}' for super stream '{}'", partition, superStream);
@@ -95,7 +100,7 @@ class SuperStreamConsumer implements Consumer {
95100
private static final class ConsumerState {
96101

97102
private volatile long offset = 0;
98-
private volatile Consumer consumer;
103+
private volatile StreamConsumer consumer;
99104
}
100105

101106
private static final class ManualOffsetTrackingMessageHandler implements MessageHandler {
@@ -129,13 +134,21 @@ public long timestamp() {
129134
public void storeOffset() {
130135
for (ConsumerState state : consumerStates) {
131136
if (ManualOffsetTrackingMessageHandler.this.consumerState == state) {
132-
context.storeOffset();
137+
maybeStoreOffset(state, () -> context.storeOffset());
133138
} else if (state.offset != 0) {
134-
state.consumer.store(state.offset);
139+
maybeStoreOffset(state, () -> state.consumer.store(state.offset));
135140
}
136141
}
137142
}
138143

144+
private void maybeStoreOffset(ConsumerState state, Runnable storeAction) {
145+
if (state.consumer.isSac() && state.consumer.sacStatus() != Status.ACTIVE) {
146+
// do nothing
147+
} else {
148+
storeAction.run();
149+
}
150+
}
151+
139152
@Override
140153
public String stream() {
141154
return context.stream();

src/test/java/com/rabbitmq/stream/impl/OffsetTrackingTest.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@
5454
import java.util.stream.Collectors;
5555
import java.util.stream.IntStream;
5656
import java.util.stream.Stream;
57-
import org.junit.jupiter.api.Disabled;
5857
import org.junit.jupiter.api.Test;
5958
import org.junit.jupiter.api.TestInfo;
6059
import org.junit.jupiter.api.extension.ExtendWith;
@@ -107,13 +106,22 @@ void trackAndQuery(
107106
}
108107

109108
@Test
110-
@Disabled
111109
void shouldReturnNoOffsetIfNothingStoredForReference() {
112110
QueryOffsetResponse response = cf.get().queryOffset(UUID.randomUUID().toString(), stream);
113111
assertThat(response).is(ko()).has(responseCode(Constants.RESPONSE_CODE_NO_OFFSET));
114112
assertThat(response.getOffset()).isEqualTo(0);
115113
}
116114

115+
@Test
116+
void storedOffsetCanGoBackward() throws Exception {
117+
String reference = UUID.randomUUID().toString();
118+
Client client = cf.get();
119+
client.storeOffset(reference, stream, 100);
120+
waitAtMost(() -> client.queryOffset(reference, stream).getOffset() == 100);
121+
client.storeOffset(reference, stream, 50);
122+
waitAtMost(() -> client.queryOffset(reference, stream).getOffset() == 50);
123+
}
124+
117125
@ParameterizedTest
118126
@MethodSource
119127
void consumeAndStore(BiConsumer<String, Client> streamCreator, TestInfo info) throws Exception {

src/test/java/com/rabbitmq/stream/impl/SuperStreamConsumerTest.java

+240
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,16 @@
2323

2424
import com.rabbitmq.client.Connection;
2525
import com.rabbitmq.client.ConnectionFactory;
26+
import com.rabbitmq.stream.Constants;
2627
import com.rabbitmq.stream.Consumer;
28+
import com.rabbitmq.stream.ConsumerUpdateListener;
2729
import com.rabbitmq.stream.ConsumerUpdateListener.Status;
2830
import com.rabbitmq.stream.Environment;
2931
import com.rabbitmq.stream.EnvironmentBuilder;
3032
import com.rabbitmq.stream.OffsetSpecification;
33+
import com.rabbitmq.stream.StreamException;
3134
import com.rabbitmq.stream.impl.Client.ClientParameters;
35+
import com.rabbitmq.stream.impl.TestUtils.CallableBooleanSupplier;
3236
import com.rabbitmq.stream.impl.TestUtils.SingleActiveConsumer;
3337
import com.rabbitmq.stream.impl.Utils.CompositeConsumerUpdateListener;
3438
import io.netty.channel.EventLoopGroup;
@@ -83,6 +87,30 @@ private static void publishToPartitions(
8387
latchAssert(publishLatch).completes();
8488
}
8589

90+
private static void waitUntil(CallableBooleanSupplier action) {
91+
try {
92+
waitAtMost(action);
93+
} catch (Exception e) {
94+
throw new RuntimeException(e);
95+
}
96+
}
97+
98+
private static OffsetSpecification lastStoredOffset(
99+
Consumer consumer, OffsetSpecification defaultValue) {
100+
OffsetSpecification offsetSpecification;
101+
try {
102+
long storedOffset = consumer.storedOffset();
103+
offsetSpecification = OffsetSpecification.offset(storedOffset);
104+
} catch (StreamException e) {
105+
if (e.getCode() == Constants.RESPONSE_CODE_NO_OFFSET) {
106+
offsetSpecification = defaultValue;
107+
} else {
108+
throw e;
109+
}
110+
}
111+
return offsetSpecification;
112+
}
113+
86114
@BeforeEach
87115
void init(TestInfo info) throws Exception {
88116
EnvironmentBuilder environmentBuilder = Environment.builder().eventLoopGroup(eventLoopGroup);
@@ -521,4 +549,216 @@ void sacAutoOffsetTrackingShouldStoreOnRelanbancing() throws Exception {
521549
.isGreaterThanOrEqualTo(expectedMessageCountPerPartition)
522550
.isEqualTo(c.queryOffset(consumerName, partition).getOffset()));
523551
}
552+
553+
@Test
554+
@SingleActiveConsumer
555+
void sacManualOffsetTrackingShouldTakeOverOnRelanbancing() throws Exception {
556+
declareSuperStreamTopology(connection, superStream, partitionCount);
557+
int messageCount = 5_000;
558+
int storeEvery = messageCount / 100;
559+
AtomicInteger messageWaveCount = new AtomicInteger();
560+
int superConsumerCount = 3;
561+
List<String> partitions =
562+
IntStream.range(0, partitionCount)
563+
.mapToObj(i -> superStream + "-" + i)
564+
.collect(Collectors.toList());
565+
Map<String, Boolean> consumerStates =
566+
new ConcurrentHashMap<>(partitionCount * superConsumerCount);
567+
Map<String, AtomicInteger> receivedMessages =
568+
new ConcurrentHashMap<>(partitionCount * superConsumerCount);
569+
Map<String, AtomicInteger> receivedMessagesPerPartitions =
570+
new ConcurrentHashMap<>(partitionCount);
571+
Map<String, Map<String, Consumer>> consumers = new ConcurrentHashMap<>(superConsumerCount);
572+
IntStream.range(0, superConsumerCount)
573+
.mapToObj(String::valueOf)
574+
.forEach(
575+
consumer ->
576+
partitions.forEach(
577+
partition -> {
578+
consumers.put(consumer, new ConcurrentHashMap<>(partitionCount));
579+
consumerStates.put(consumer + partition, false);
580+
receivedMessages.put(consumer + partition, new AtomicInteger(0));
581+
}));
582+
Map<String, Long> lastReceivedOffsets = new ConcurrentHashMap<>();
583+
partitions.forEach(
584+
partition -> {
585+
lastReceivedOffsets.put(partition, 0L);
586+
receivedMessagesPerPartitions.put(partition, new AtomicInteger(0));
587+
});
588+
Runnable publishOnAllPartitions =
589+
() -> {
590+
partitions.forEach(
591+
partition -> TestUtils.publishAndWaitForConfirms(cf, messageCount, partition));
592+
messageWaveCount.incrementAndGet();
593+
};
594+
String consumerName = "my-app";
595+
596+
OffsetSpecification initialOffsetSpecification = OffsetSpecification.first();
597+
Function<String, Consumer> consumerCreator =
598+
consumer -> {
599+
AtomicInteger received = new AtomicInteger();
600+
ConsumerUpdateListener consumerUpdateListener =
601+
context -> {
602+
consumers.get(consumer).putIfAbsent(context.stream(), context.consumer());
603+
consumerStates.put(consumer + context.stream(), context.status() == Status.ACTIVE);
604+
OffsetSpecification offsetSpecification = null;
605+
if (context.status() == Status.ACTIVE) {
606+
try {
607+
long storedOffset = context.consumer().storedOffset() + 1;
608+
offsetSpecification = OffsetSpecification.offset(storedOffset);
609+
} catch (StreamException e) {
610+
if (e.getCode() == Constants.RESPONSE_CODE_NO_OFFSET) {
611+
offsetSpecification = initialOffsetSpecification;
612+
} else {
613+
throw e;
614+
}
615+
}
616+
} else if (context.previousStatus() == Status.ACTIVE
617+
&& context.status() == Status.PASSIVE) {
618+
long lastReceivedOffset = lastReceivedOffsets.get(context.stream());
619+
context.consumer().store(lastReceivedOffset);
620+
waitUntil(() -> context.consumer().storedOffset() == lastReceivedOffset);
621+
}
622+
return offsetSpecification;
623+
};
624+
return environment
625+
.consumerBuilder()
626+
.singleActiveConsumer()
627+
.superStream(superStream)
628+
.offset(initialOffsetSpecification)
629+
.name(consumerName)
630+
.autoTrackingStrategy()
631+
.builder()
632+
.consumerUpdateListener(consumerUpdateListener)
633+
.messageHandler(
634+
(context, message) -> {
635+
if (received.incrementAndGet() % storeEvery == 0) {
636+
context.storeOffset();
637+
}
638+
lastReceivedOffsets.put(context.stream(), context.offset());
639+
receivedMessagesPerPartitions.get(context.stream()).incrementAndGet();
640+
receivedMessages.get(consumer + context.stream()).incrementAndGet();
641+
})
642+
.build();
643+
};
644+
645+
Consumer consumer0 = consumerCreator.apply("0");
646+
waitAtMost(
647+
() ->
648+
consumerStates.get("0" + partitions.get(0))
649+
&& consumerStates.get("0" + partitions.get(1))
650+
&& consumerStates.get("0" + partitions.get(2)));
651+
652+
publishOnAllPartitions.run();
653+
654+
waitAtMost(
655+
() ->
656+
receivedMessages.get("0" + partitions.get(0)).get() == messageCount
657+
&& receivedMessages.get("0" + partitions.get(1)).get() == messageCount
658+
&& receivedMessages.get("0" + partitions.get(2)).get() == messageCount);
659+
660+
Consumer consumer1 = consumerCreator.apply("1");
661+
662+
waitAtMost(
663+
() ->
664+
consumerStates.get("0" + partitions.get(0))
665+
&& consumerStates.get("1" + partitions.get(1))
666+
&& consumerStates.get("0" + partitions.get(2)));
667+
668+
publishOnAllPartitions.run();
669+
670+
waitAtMost(
671+
() ->
672+
receivedMessages.get("0" + partitions.get(0)).get() == messageCount * 2
673+
&& receivedMessages.get("1" + partitions.get(1)).get() == messageCount
674+
&& receivedMessages.get("0" + partitions.get(2)).get() == messageCount * 2);
675+
676+
Consumer consumer2 = consumerCreator.apply("2");
677+
678+
waitAtMost(
679+
() ->
680+
consumerStates.get("0" + partitions.get(0))
681+
&& consumerStates.get("1" + partitions.get(1))
682+
&& consumerStates.get("2" + partitions.get(2)));
683+
684+
publishOnAllPartitions.run();
685+
686+
waitAtMost(
687+
() ->
688+
receivedMessages.get("0" + partitions.get(0)).get() == messageCount * 3
689+
&& receivedMessages.get("1" + partitions.get(1)).get() == messageCount * 2
690+
&& receivedMessages.get("2" + partitions.get(2)).get() == messageCount);
691+
692+
java.util.function.Consumer<String> storeLastProcessedOffsets =
693+
consumerIndex -> {
694+
consumers.get(consumerIndex).entrySet().stream()
695+
.filter(
696+
partitionToInnerConsumer -> {
697+
// we take only the inner consumers that are active
698+
return consumerStates.get(consumerIndex + partitionToInnerConsumer.getKey());
699+
})
700+
.forEach(
701+
entry -> {
702+
String partition = entry.getKey();
703+
Consumer consumer = entry.getValue();
704+
long offset = lastReceivedOffsets.get(partition);
705+
consumer.store(offset);
706+
waitUntil(() -> consumer.storedOffset() == offset);
707+
});
708+
};
709+
storeLastProcessedOffsets.accept("0");
710+
consumer0.close();
711+
712+
waitAtMost(
713+
() ->
714+
consumerStates.get("1" + partitions.get(0))
715+
&& consumerStates.get("2" + partitions.get(1))
716+
&& consumerStates.get("1" + partitions.get(2)));
717+
718+
publishOnAllPartitions.run();
719+
720+
waitAtMost(
721+
() ->
722+
receivedMessages.get("1" + partitions.get(0)).get() == messageCount
723+
&& receivedMessages.get("2" + partitions.get(1)).get() == messageCount
724+
&& receivedMessages.get("1" + partitions.get(2)).get() == messageCount);
725+
726+
storeLastProcessedOffsets.accept("1");
727+
consumer1.close();
728+
729+
waitAtMost(
730+
() ->
731+
consumerStates.get("2" + partitions.get(0))
732+
&& consumerStates.get("2" + partitions.get(1))
733+
&& consumerStates.get("2" + partitions.get(2)));
734+
735+
publishOnAllPartitions.run();
736+
737+
waitAtMost(
738+
() ->
739+
receivedMessages.get("2" + partitions.get(0)).get() == messageCount
740+
&& receivedMessages.get("2" + partitions.get(1)).get() == messageCount * 2
741+
&& receivedMessages.get("2" + partitions.get(2)).get() == messageCount * 2);
742+
743+
storeLastProcessedOffsets.accept("2");
744+
consumer2.close();
745+
746+
assertThat(messageWaveCount).hasValue(5);
747+
assertThat(
748+
receivedMessages.values().stream()
749+
.map(AtomicInteger::get)
750+
.mapToInt(Integer::intValue)
751+
.sum())
752+
.isEqualTo(messageCount * partitionCount * 5);
753+
int expectedMessageCountPerPartition = messageCount * messageWaveCount.get();
754+
receivedMessagesPerPartitions
755+
.values()
756+
.forEach(v -> assertThat(v).hasValue(expectedMessageCountPerPartition));
757+
Client c = cf.get();
758+
partitions.forEach(
759+
partition ->
760+
assertThat(lastReceivedOffsets.get(partition))
761+
.isGreaterThanOrEqualTo(expectedMessageCountPerPartition)
762+
.isEqualTo(c.queryOffset(consumerName, partition).getOffset()));
763+
}
524764
}

0 commit comments

Comments
 (0)