Skip to content

Commit 69fbf1a

Browse files
committed
Add SAC test with different groups on same super stream
References #46, rabbitmq/rabbitmq-server#3753
1 parent 19f35e6 commit 69fbf1a

File tree

3 files changed

+119
-18
lines changed

3 files changed

+119
-18
lines changed

src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java

Lines changed: 35 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -123,12 +123,25 @@ class StreamConsumer implements Consumer {
123123
if (Utils.isSac(subscriptionProperties)) {
124124
this.sacStatus = ConsumerUpdateListener.Status.PASSIVE;
125125
MessageHandler existingMessageHandler = decoratedMessageHandler.get();
126-
MessageHandler messageHandlerWithSac =
127-
(context, message) -> {
128-
if (sacStatus == ConsumerUpdateListener.Status.ACTIVE) {
129-
existingMessageHandler.handle(context, message);
130-
}
131-
};
126+
AtomicBoolean receivedSomething = new AtomicBoolean(false);
127+
MessageHandler messageHandlerWithSac;
128+
if (trackingConfiguration.auto()) {
129+
messageHandlerWithSac =
130+
(context, message) -> {
131+
if (sacStatus == ConsumerUpdateListener.Status.ACTIVE) {
132+
receivedSomething.set(true);
133+
existingMessageHandler.handle(context, message);
134+
}
135+
};
136+
} else {
137+
messageHandlerWithSac =
138+
(context, message) -> {
139+
if (sacStatus == ConsumerUpdateListener.Status.ACTIVE) {
140+
existingMessageHandler.handle(context, message);
141+
}
142+
};
143+
}
144+
132145
decoratedMessageHandler.set(messageHandlerWithSac);
133146

134147
if (consumerUpdateListener == null
@@ -160,17 +173,21 @@ class StreamConsumer implements Consumer {
160173
return result;
161174
} else if (context.previousStatus() == ConsumerUpdateListener.Status.ACTIVE
162175
&& context.status() == ConsumerUpdateListener.Status.PASSIVE) {
163-
LOGGER.debug(
164-
"Storing offset (consumer {}, stream {}) because going from active to passive",
165-
this.id,
166-
this.stream);
167-
long offset = trackingFlushCallback.getAsLong();
168-
LOGGER.debug(
169-
"Making sure offset {} has been stored (consumer {}, stream {})",
170-
offset,
171-
this.id,
172-
this.stream);
173-
waitForOffsetToBeStored(offset);
176+
if (receivedSomething.get()) {
177+
LOGGER.debug(
178+
"Storing offset (consumer {}, stream {}) because going from active to passive",
179+
this.id,
180+
this.stream);
181+
long offset = trackingFlushCallback.getAsLong();
182+
LOGGER.debug(
183+
"Making sure offset {} has been stored (consumer {}, stream {})",
184+
offset,
185+
this.id,
186+
this.stream);
187+
waitForOffsetToBeStored(offset);
188+
} else {
189+
LOGGER.debug("Nothing received yet, no need to store offset");
190+
}
174191
result = OffsetSpecification.none();
175192
}
176193
return result;
@@ -292,7 +309,7 @@ void waitForOffsetToBeStored(long expectedStoredOffset) {
292309
this.name, this.stream, expectedStoredOffset)
293310
.delayPolicy(
294311
BackOffDelayPolicy.fixedWithInitialDelay(
295-
Duration.ofMillis(500), Duration.ofMillis(200)))
312+
Duration.ofMillis(200), Duration.ofMillis(200)))
296313
.retry(exception -> exception instanceof IllegalStateException)
297314
.scheduler(environment.scheduledExecutorService())
298315
.build();

src/test/java/com/rabbitmq/stream/impl/SacClientTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,7 @@ void singleActiveConsumersShouldSpreadOnSuperStreamPartitions(TestInfo info) thr
318318
String superStream = streamName(info);
319319
String consumerName = "foo";
320320
Connection c = new ConnectionFactory().newConnection();
321+
// subscription distribution
321322
// client 1: 0, 1, 2 / client 2: 3, 4, 5, / client 3: 6, 7, 8
322323
try {
323324
declareSuperStreamTopology(c, superStream, 3);

src/test/java/com/rabbitmq/stream/impl/SacSuperStreamConsumerTest.java

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import com.rabbitmq.stream.impl.Utils.CompositeConsumerUpdateListener;
3636
import io.netty.channel.EventLoopGroup;
3737
import java.time.Duration;
38+
import java.util.ArrayList;
39+
import java.util.Collections;
3840
import java.util.List;
3941
import java.util.Map;
4042
import java.util.concurrent.ConcurrentHashMap;
@@ -728,6 +730,87 @@ void sacCustomOffsetTrackingShouldTakeOverOnRelanbancing() throws Exception {
728730
.isGreaterThanOrEqualTo(expectedMessageCountPerPartition - 1));
729731
}
730732

733+
@Test
734+
void consumerGroupsOnSameSuperStreamShouldBeIndependent() throws Exception {
735+
declareSuperStreamTopology(connection, superStream, partitionCount);
736+
int messageCount = 20_000;
737+
int consumerGroupsCount = 3;
738+
List<String> consumerNames =
739+
IntStream.range(0, consumerGroupsCount).mapToObj(i -> "my-app-" + i).collect(toList());
740+
List<String> partitions =
741+
IntStream.range(0, partitionCount).mapToObj(i -> superStream + "-" + i).collect(toList());
742+
Map<String, AtomicInteger> receivedMessages = new ConcurrentHashMap<>(consumerGroupsCount);
743+
Map<String, AtomicInteger> receivedMessagesPerPartitions =
744+
new ConcurrentHashMap<>(consumerGroupsCount * partitionCount);
745+
Map<String, Long> lastReceivedOffsets = new ConcurrentHashMap<>();
746+
partitions.forEach(
747+
partition -> {
748+
lastReceivedOffsets.put(partition, 0L);
749+
receivedMessagesPerPartitions.put(partition, new AtomicInteger(0));
750+
});
751+
752+
List<Consumer> consumers = new ArrayList<>(consumerGroupsCount * partitionCount);
753+
consumerNames.forEach(
754+
consumerName -> {
755+
receivedMessages.put(consumerName, new AtomicInteger(0));
756+
partitions.forEach(
757+
partition -> {
758+
receivedMessagesPerPartitions.put(partition + consumerName, new AtomicInteger(0));
759+
Consumer consumer =
760+
environment
761+
.consumerBuilder()
762+
.singleActiveConsumer()
763+
.superStream(superStream)
764+
.offset(OffsetSpecification.first())
765+
.name(consumerName)
766+
.autoTrackingStrategy()
767+
.builder()
768+
.messageHandler(
769+
(context, message) -> {
770+
lastReceivedOffsets.put(
771+
context.stream() + consumerName, context.offset());
772+
receivedMessagesPerPartitions
773+
.get(context.stream() + consumerName)
774+
.incrementAndGet();
775+
receivedMessages.get(consumerName).incrementAndGet();
776+
})
777+
.build();
778+
consumers.add(consumer);
779+
});
780+
});
781+
782+
partitions.forEach(
783+
partition -> TestUtils.publishAndWaitForConfirms(cf, messageCount, partition));
784+
785+
int messageTotal = messageCount * partitionCount;
786+
consumerNames.forEach(
787+
consumerName -> waitUntil(() -> receivedMessages.get(consumerName).get() == messageTotal));
788+
789+
consumerNames.forEach(
790+
consumerName -> {
791+
// summing the received messages for the consumer group name
792+
assertThat(
793+
receivedMessagesPerPartitions.entrySet().stream()
794+
.filter(e -> e.getKey().endsWith(consumerName))
795+
.mapToInt(e -> e.getValue().get())
796+
.sum())
797+
.isEqualTo(messageTotal);
798+
});
799+
800+
Collections.reverse(consumers);
801+
consumers.forEach(Consumer::close);
802+
803+
Client c = cf.get();
804+
consumerNames.forEach(
805+
consumerName -> {
806+
partitions.forEach(
807+
partition -> {
808+
assertThat(c.queryOffset(consumerName, partition).getOffset())
809+
.isEqualTo(lastReceivedOffsets.get(partition + consumerName));
810+
});
811+
});
812+
}
813+
731814
private static void waitUntil(CallableBooleanSupplier action) {
732815
try {
733816
waitAtMost(action);

0 commit comments

Comments
 (0)