Skip to content

Commit 8543515

Browse files
committed
Add super stream SAC balancing test
References rabbitmq/rabbitmq-server#3753
1 parent cb2e002 commit 8543515

File tree

8 files changed

+129
-8
lines changed

8 files changed

+129
-8
lines changed

src/main/java/com/rabbitmq/stream/ConsumerUpdateListener.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ interface Context {
2121

2222
Consumer consumer();
2323

24+
String stream();
25+
2426
Status status();
2527

2628
Status previousStatus();

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,7 @@ private SubscriptionTracker(
260260
} else {
261261
Map<String, String> properties = new ConcurrentHashMap<>(subscriptionProperties.size() + 1);
262262
properties.putAll(subscriptionProperties);
263+
// we propagate the subscription name, used for monitoring
263264
properties.put("name", this.offsetTrackingReference);
264265
this.subscriptionProperties = Collections.unmodifiableMap(properties);
265266
}

src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -308,12 +308,12 @@ OffsetSpecification consumerUpdate(boolean active) {
308308

309309
private static class DefaultConsumerUpdateContext implements ConsumerUpdateListener.Context {
310310

311-
private final Consumer consumer;
311+
private final StreamConsumer consumer;
312312
private final ConsumerUpdateListener.Status status;
313313
private final ConsumerUpdateListener.Status previousStatus;
314314

315315
private DefaultConsumerUpdateContext(
316-
Consumer consumer,
316+
StreamConsumer consumer,
317317
ConsumerUpdateListener.Status status,
318318
ConsumerUpdateListener.Status previousStatus) {
319319
this.consumer = consumer;
@@ -326,6 +326,11 @@ public Consumer consumer() {
326326
return this.consumer;
327327
}
328328

329+
@Override
330+
public String stream() {
331+
return this.consumer.stream;
332+
}
333+
329334
@Override
330335
public ConsumerUpdateListener.Status status() {
331336
return this.status;
@@ -335,6 +340,20 @@ public ConsumerUpdateListener.Status status() {
335340
public ConsumerUpdateListener.Status previousStatus() {
336341
return this.previousStatus;
337342
}
343+
344+
@Override
345+
public String toString() {
346+
return "DefaultConsumerUpdateContext{"
347+
+ "consumer="
348+
+ consumer
349+
+ ", stream="
350+
+ stream()
351+
+ ", status="
352+
+ status
353+
+ ", previousStatus="
354+
+ previousStatus
355+
+ '}';
356+
}
338357
}
339358

340359
private boolean canTrack() {

src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,12 +133,15 @@ public Consumer build() {
133133
throw new IllegalArgumentException("Stream and superStream cannot be set at the same time");
134134
}
135135
if (this.messageHandler == null) {
136-
throw new IllegalArgumentException("A message handle must be set");
136+
throw new IllegalArgumentException("A message handler must be set");
137137
}
138138
if (this.name == null
139139
&& (this.autoTrackingStrategy != null || this.manualTrackingStrategy != null)) {
140140
throw new IllegalArgumentException("A name must be set if a tracking strategy is specified");
141141
}
142+
if (Utils.isSac(this.subscriptionProperties) && this.name == null) {
143+
throw new IllegalArgumentException("A name must be set if single active consumer is enabled");
144+
}
142145

143146
this.environment.maybeInitializeLocator();
144147
TrackingConfiguration trackingConfiguration;
@@ -179,6 +182,9 @@ public Consumer build() {
179182
this.consumerUpdateListener);
180183
environment.addConsumer((StreamConsumer) consumer);
181184
} else {
185+
if (Utils.isSac(this.subscriptionProperties)) {
186+
this.subscriptionProperties.put("super-stream", this.superStream);
187+
}
182188
consumer =
183189
new SuperStreamConsumer(this, this.superStream, this.environment, trackingConfiguration);
184190
}

src/test/java/com/rabbitmq/stream/impl/OffsetTrackingTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import static com.rabbitmq.stream.impl.TestUtils.b;
1717
import static com.rabbitmq.stream.impl.TestUtils.forEach;
1818
import static com.rabbitmq.stream.impl.TestUtils.latchAssert;
19-
import static com.rabbitmq.stream.impl.TestUtils.responseCode;
2019
import static com.rabbitmq.stream.impl.TestUtils.streamName;
2120
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
2221
import static org.assertj.core.api.Assertions.assertThat;

src/test/java/com/rabbitmq/stream/impl/SingleActiveConsumerTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ void singleActiveConsumerShouldRolloverWhenAnotherJoinsPartition(TestInfo info)
226226
OffsetSpecification result;
227227

228228
if (previousState == false && active == true) {
229-
long storedOffset = writerClient.queryOffset(consumerName, partition);
229+
long storedOffset = writerClient.queryOffset(consumerName, partition).getOffset();
230230
result =
231231
storedOffset == 0
232232
? OffsetSpecification.first()
@@ -237,7 +237,7 @@ void singleActiveConsumerShouldRolloverWhenAnotherJoinsPartition(TestInfo info)
237237
try {
238238
waitAtMost(
239239
() ->
240-
writerClient.queryOffset(consumerName, partition)
240+
writerClient.queryOffset(consumerName, partition).getOffset()
241241
== lastReceivedOffset.get());
242242
} catch (Exception e) {
243243
throw new RuntimeException(e);
@@ -290,7 +290,7 @@ void singleActiveConsumerShouldRolloverWhenAnotherJoinsPartition(TestInfo info)
290290
// clean unsubscription, storing the offset
291291
writerClient.storeOffset(consumerName, partition, lastReceivedOffset.get());
292292
waitAtMost(
293-
() -> writerClient.queryOffset(consumerName, partition) == lastReceivedOffset.get());
293+
() -> writerClient.queryOffset(consumerName, partition).getOffset() == lastReceivedOffset.get());
294294

295295
response = client.unsubscribe(b(1));
296296
assertThat(response.isOk()).isTrue();

src/test/java/com/rabbitmq/stream/impl/StreamConsumerSacTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,6 @@ void customTrackingSecondConsumerShouldTakeOverWhereTheFirstOneLeftOff() throws
163163
consumer2.close();
164164

165165
// nothing stored on the server side
166-
assertThat(cf.get().queryOffset(consumerName, stream)).isZero();
166+
assertThat(cf.get().queryOffset(consumerName, stream).getOffset()).isZero();
167167
}
168168
}

src/test/java/com/rabbitmq/stream/impl/SuperStreamConsumerTest.java

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,14 @@
1818
import static com.rabbitmq.stream.impl.TestUtils.deleteSuperStreamTopology;
1919
import static com.rabbitmq.stream.impl.TestUtils.latchAssert;
2020
import static com.rabbitmq.stream.impl.TestUtils.localhost;
21+
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
2122
import static org.assertj.core.api.Assertions.assertThat;
2223

2324
import com.rabbitmq.client.Connection;
2425
import com.rabbitmq.client.ConnectionFactory;
2526
import com.rabbitmq.stream.Consumer;
27+
import com.rabbitmq.stream.ConsumerUpdateListener;
28+
import com.rabbitmq.stream.ConsumerUpdateListener.Status;
2629
import com.rabbitmq.stream.Environment;
2730
import com.rabbitmq.stream.EnvironmentBuilder;
2831
import com.rabbitmq.stream.OffsetSpecification;
@@ -31,10 +34,14 @@
3134
import java.nio.charset.StandardCharsets;
3235
import java.util.Collections;
3336
import java.util.List;
37+
import java.util.Map;
3438
import java.util.concurrent.ConcurrentHashMap;
3539
import java.util.concurrent.ConcurrentMap;
3640
import java.util.concurrent.CountDownLatch;
3741
import java.util.concurrent.atomic.AtomicInteger;
42+
import java.util.function.Function;
43+
import java.util.stream.Collectors;
44+
import java.util.stream.IntStream;
3845
import org.junit.jupiter.api.AfterEach;
3946
import org.junit.jupiter.api.BeforeEach;
4047
import org.junit.jupiter.api.Test;
@@ -233,4 +240,91 @@ void autoOffsetTrackingShouldStoreOnAllPartitions() throws Exception {
233240
.isGreaterThan(almostLastOffset));
234241
consumer.close();
235242
}
243+
244+
@Test
245+
void sacShouldSpreadAcrossPartitions() throws Exception {
246+
declareSuperStreamTopology(connection, superStream, partitionCount);
247+
List<String> partitions =
248+
IntStream.range(0, partitionCount)
249+
.mapToObj(i -> superStream + "-" + i)
250+
.collect(Collectors.toList());
251+
Map<String, Status> consumerStates = new ConcurrentHashMap<>();
252+
String consumerName = "my-app";
253+
Function<String, Consumer> consumerCreator =
254+
consumer -> {
255+
return environment
256+
.consumerBuilder()
257+
.singleActiveConsumer()
258+
.superStream(superStream)
259+
.offset(OffsetSpecification.first())
260+
.name(consumerName)
261+
.manualTrackingStrategy()
262+
.builder()
263+
.messageHandler((context, message) -> {})
264+
.consumerUpdateListener(
265+
new ConsumerUpdateListener() {
266+
@Override
267+
public OffsetSpecification update(Context context) {
268+
consumerStates.put(consumer + context.stream(), context.status());
269+
// System.out.println(context);
270+
return null;
271+
}
272+
})
273+
.build();
274+
};
275+
276+
Consumer consumer0 = consumerCreator.apply("0");
277+
278+
waitAtMost(
279+
() ->
280+
consumerStates.get("0" + partitions.get(0)) == Status.ACTIVE
281+
&& consumerStates.get("0" + partitions.get(1)) == Status.ACTIVE
282+
&& consumerStates.get("0" + partitions.get(2)) == Status.ACTIVE);
283+
284+
Consumer consumer1 = consumerCreator.apply("1");
285+
286+
waitAtMost(
287+
() ->
288+
consumerStates.get("0" + partitions.get(0)) == Status.ACTIVE
289+
&& consumerStates.get("1" + partitions.get(1)) == Status.ACTIVE
290+
&& consumerStates.get("0" + partitions.get(2)) == Status.ACTIVE);
291+
292+
Consumer consumer2 = consumerCreator.apply("2");
293+
294+
waitAtMost(
295+
() ->
296+
consumerStates.get("0" + partitions.get(0)) == Status.ACTIVE
297+
&& consumerStates.get("1" + partitions.get(1)) == Status.ACTIVE
298+
&& consumerStates.get("2" + partitions.get(2)) == Status.ACTIVE);
299+
300+
waitAtMost(() -> consumerStates.size() == 9);
301+
assertThat(consumerStates)
302+
.containsEntry("0" + partitions.get(0), Status.ACTIVE)
303+
.containsEntry("0" + partitions.get(1), Status.PASSIVE)
304+
.containsEntry("0" + partitions.get(2), Status.PASSIVE)
305+
.containsEntry("1" + partitions.get(0), Status.PASSIVE)
306+
.containsEntry("1" + partitions.get(1), Status.ACTIVE)
307+
.containsEntry("1" + partitions.get(2), Status.PASSIVE)
308+
.containsEntry("2" + partitions.get(0), Status.PASSIVE)
309+
.containsEntry("2" + partitions.get(1), Status.PASSIVE)
310+
.containsEntry("2" + partitions.get(2), Status.ACTIVE);
311+
312+
consumer0.close();
313+
314+
waitAtMost(
315+
() ->
316+
consumerStates.get("1" + partitions.get(0)) == Status.ACTIVE
317+
&& consumerStates.get("2" + partitions.get(1)) == Status.ACTIVE
318+
&& consumerStates.get("1" + partitions.get(2)) == Status.ACTIVE);
319+
320+
consumer1.close();
321+
322+
waitAtMost(
323+
() ->
324+
consumerStates.get("2" + partitions.get(0)) == Status.ACTIVE
325+
&& consumerStates.get("2" + partitions.get(1)) == Status.ACTIVE
326+
&& consumerStates.get("2" + partitions.get(2)) == Status.ACTIVE);
327+
328+
consumer2.close();
329+
}
236330
}

0 commit comments

Comments
 (0)