Skip to content

Commit b86f3f3

Browse files
committed
Add super stream SAC balancing test
References rabbitmq/rabbitmq-server#3753
1 parent c9a004d commit b86f3f3

File tree

5 files changed

+153
-3
lines changed

5 files changed

+153
-3
lines changed

src/main/java/com/rabbitmq/stream/ConsumerUpdateListener.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ interface Context {
2121

2222
Consumer consumer();
2323

24+
String stream();
25+
2426
Status status();
2527

2628
Status previousStatus();

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ private SubscriptionTracker(
250250
} else {
251251
Map<String, String> properties = new ConcurrentHashMap<>(subscriptionProperties.size() + 1);
252252
properties.putAll(subscriptionProperties);
253+
// we propagate the subscription name, used for monitoring
253254
properties.put("name", this.offsetTrackingReference);
254255
this.subscriptionProperties = Collections.unmodifiableMap(properties);
255256
}

src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,13 @@
2727
import java.time.Duration;
2828
import java.util.Collections;
2929
import java.util.Map;
30+
import java.util.Objects;
3031
import java.util.concurrent.CompletableFuture;
3132
import java.util.concurrent.ExecutionException;
3233
import java.util.concurrent.TimeUnit;
3334
import java.util.concurrent.TimeoutException;
3435
import java.util.concurrent.atomic.AtomicBoolean;
36+
import java.util.concurrent.atomic.AtomicLong;
3537
import java.util.concurrent.atomic.AtomicReference;
3638
import java.util.function.LongConsumer;
3739
import java.util.function.LongSupplier;
@@ -40,7 +42,10 @@
4042

4143
class StreamConsumer implements Consumer {
4244

45+
private static final AtomicLong ID_SEQUENCE = new AtomicLong(0);
46+
4347
private static final Logger LOGGER = LoggerFactory.getLogger(StreamConsumer.class);
48+
private final long id;
4449
private final Runnable closingTrackingCallback;
4550
private final AtomicBoolean closed = new AtomicBoolean(false);
4651
private final String name;
@@ -67,6 +72,7 @@ class StreamConsumer implements Consumer {
6772
Map<String, String> subscriptionProperties,
6873
ConsumerUpdateListener consumerUpdateListener) {
6974

75+
this.id = ID_SEQUENCE.getAndIncrement();
7076
try {
7177
this.name = name;
7278
this.stream = stream;
@@ -289,12 +295,12 @@ OffsetSpecification consumerUpdate(boolean active) {
289295

290296
private static class DefaultConsumerUpdateContext implements ConsumerUpdateListener.Context {
291297

292-
private final Consumer consumer;
298+
private final StreamConsumer consumer;
293299
private final ConsumerUpdateListener.Status status;
294300
private final ConsumerUpdateListener.Status previousStatus;
295301

296302
private DefaultConsumerUpdateContext(
297-
Consumer consumer,
303+
StreamConsumer consumer,
298304
ConsumerUpdateListener.Status status,
299305
ConsumerUpdateListener.Status previousStatus) {
300306
this.consumer = consumer;
@@ -307,6 +313,11 @@ public Consumer consumer() {
307313
return this.consumer;
308314
}
309315

316+
@Override
317+
public String stream() {
318+
return this.consumer.stream;
319+
}
320+
310321
@Override
311322
public ConsumerUpdateListener.Status status() {
312323
return this.status;
@@ -316,6 +327,20 @@ public ConsumerUpdateListener.Status status() {
316327
public ConsumerUpdateListener.Status previousStatus() {
317328
return this.previousStatus;
318329
}
330+
331+
@Override
332+
public String toString() {
333+
return "DefaultConsumerUpdateContext{"
334+
+ "consumer="
335+
+ consumer
336+
+ ", stream="
337+
+ stream()
338+
+ ", status="
339+
+ status
340+
+ ", previousStatus="
341+
+ previousStatus
342+
+ '}';
343+
}
319344
}
320345

321346
private boolean canTrack() {
@@ -387,4 +412,26 @@ enum Status {
387412
NOT_AVAILABLE,
388413
CLOSED
389414
}
415+
416+
@Override
417+
public boolean equals(Object o) {
418+
if (this == o) {
419+
return true;
420+
}
421+
if (o == null || getClass() != o.getClass()) {
422+
return false;
423+
}
424+
StreamConsumer that = (StreamConsumer) o;
425+
return id == that.id && stream.equals(that.stream);
426+
}
427+
428+
@Override
429+
public int hashCode() {
430+
return Objects.hash(id, stream);
431+
}
432+
433+
@Override
434+
public String toString() {
435+
return "StreamConsumer{" + "id=" + id + ", stream='" + stream + '\'' + '}';
436+
}
390437
}

src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,12 +133,15 @@ public Consumer build() {
133133
throw new IllegalArgumentException("Stream and superStream cannot be set at the same time");
134134
}
135135
if (this.messageHandler == null) {
136-
throw new IllegalArgumentException("A message handle must be set");
136+
throw new IllegalArgumentException("A message handler must be set");
137137
}
138138
if (this.name == null
139139
&& (this.autoTrackingStrategy != null || this.manualTrackingStrategy != null)) {
140140
throw new IllegalArgumentException("A name must be set if a tracking strategy is specified");
141141
}
142+
if (Utils.isSac(this.subscriptionProperties) && this.name == null) {
143+
throw new IllegalArgumentException("A name must be set if single active consumer is enabled");
144+
}
142145

143146
this.environment.maybeInitializeLocator();
144147
TrackingConfiguration trackingConfiguration;
@@ -179,6 +182,9 @@ public Consumer build() {
179182
this.consumerUpdateListener);
180183
environment.addConsumer((StreamConsumer) consumer);
181184
} else {
185+
if (Utils.isSac(this.subscriptionProperties)) {
186+
this.subscriptionProperties.put("super-stream", this.superStream);
187+
}
182188
consumer =
183189
new SuperStreamConsumer(this, this.superStream, this.environment, trackingConfiguration);
184190
}

src/test/java/com/rabbitmq/stream/impl/SuperStreamConsumerTest.java

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,14 @@
1818
import static com.rabbitmq.stream.impl.TestUtils.deleteSuperStreamTopology;
1919
import static com.rabbitmq.stream.impl.TestUtils.latchAssert;
2020
import static com.rabbitmq.stream.impl.TestUtils.localhost;
21+
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
2122
import static org.assertj.core.api.Assertions.assertThat;
2223

2324
import com.rabbitmq.client.Connection;
2425
import com.rabbitmq.client.ConnectionFactory;
2526
import com.rabbitmq.stream.Consumer;
27+
import com.rabbitmq.stream.ConsumerUpdateListener;
28+
import com.rabbitmq.stream.ConsumerUpdateListener.Status;
2629
import com.rabbitmq.stream.Environment;
2730
import com.rabbitmq.stream.EnvironmentBuilder;
2831
import com.rabbitmq.stream.OffsetSpecification;
@@ -31,10 +34,14 @@
3134
import java.nio.charset.StandardCharsets;
3235
import java.util.Collections;
3336
import java.util.List;
37+
import java.util.Map;
3438
import java.util.concurrent.ConcurrentHashMap;
3539
import java.util.concurrent.ConcurrentMap;
3640
import java.util.concurrent.CountDownLatch;
3741
import java.util.concurrent.atomic.AtomicInteger;
42+
import java.util.function.Function;
43+
import java.util.stream.Collectors;
44+
import java.util.stream.IntStream;
3845
import org.junit.jupiter.api.AfterEach;
3946
import org.junit.jupiter.api.BeforeEach;
4047
import org.junit.jupiter.api.Test;
@@ -229,4 +236,91 @@ void autoOffsetTrackingShouldStoreOnAllPartitions() throws Exception {
229236
p -> assertThat(client.queryOffset(consumerName, p)).isGreaterThan(almostLastOffset));
230237
consumer.close();
231238
}
239+
240+
@Test
241+
void sacShouldSpreadAcrossPartitions() throws Exception {
242+
declareSuperStreamTopology(connection, superStream, partitionCount);
243+
List<String> partitions =
244+
IntStream.range(0, partitionCount)
245+
.mapToObj(i -> superStream + "-" + i)
246+
.collect(Collectors.toList());
247+
Map<String, Status> consumerStates = new ConcurrentHashMap<>();
248+
String consumerName = "my-app";
249+
Function<String, Consumer> consumerCreator =
250+
consumer -> {
251+
return environment
252+
.consumerBuilder()
253+
.singleActiveConsumer()
254+
.superStream(superStream)
255+
.offset(OffsetSpecification.first())
256+
.name(consumerName)
257+
.manualTrackingStrategy()
258+
.builder()
259+
.messageHandler((context, message) -> {})
260+
.consumerUpdateListener(
261+
new ConsumerUpdateListener() {
262+
@Override
263+
public OffsetSpecification update(Context context) {
264+
consumerStates.put(consumer + context.stream(), context.status());
265+
// System.out.println(context);
266+
return null;
267+
}
268+
})
269+
.build();
270+
};
271+
272+
Consumer consumer0 = consumerCreator.apply("0");
273+
274+
waitAtMost(
275+
() ->
276+
consumerStates.get("0" + partitions.get(0)) == Status.ACTIVE
277+
&& consumerStates.get("0" + partitions.get(1)) == Status.ACTIVE
278+
&& consumerStates.get("0" + partitions.get(2)) == Status.ACTIVE);
279+
280+
Consumer consumer1 = consumerCreator.apply("1");
281+
282+
waitAtMost(
283+
() ->
284+
consumerStates.get("0" + partitions.get(0)) == Status.ACTIVE
285+
&& consumerStates.get("1" + partitions.get(1)) == Status.ACTIVE
286+
&& consumerStates.get("0" + partitions.get(2)) == Status.ACTIVE);
287+
288+
Consumer consumer2 = consumerCreator.apply("2");
289+
290+
waitAtMost(
291+
() ->
292+
consumerStates.get("0" + partitions.get(0)) == Status.ACTIVE
293+
&& consumerStates.get("1" + partitions.get(1)) == Status.ACTIVE
294+
&& consumerStates.get("2" + partitions.get(2)) == Status.ACTIVE);
295+
296+
waitAtMost(() -> consumerStates.size() == 9);
297+
assertThat(consumerStates)
298+
.containsEntry("0" + partitions.get(0), Status.ACTIVE)
299+
.containsEntry("0" + partitions.get(1), Status.PASSIVE)
300+
.containsEntry("0" + partitions.get(2), Status.PASSIVE)
301+
.containsEntry("1" + partitions.get(0), Status.PASSIVE)
302+
.containsEntry("1" + partitions.get(1), Status.ACTIVE)
303+
.containsEntry("1" + partitions.get(2), Status.PASSIVE)
304+
.containsEntry("2" + partitions.get(0), Status.PASSIVE)
305+
.containsEntry("2" + partitions.get(1), Status.PASSIVE)
306+
.containsEntry("2" + partitions.get(2), Status.ACTIVE);
307+
308+
consumer0.close();
309+
310+
waitAtMost(
311+
() ->
312+
consumerStates.get("1" + partitions.get(0)) == Status.ACTIVE
313+
&& consumerStates.get("2" + partitions.get(1)) == Status.ACTIVE
314+
&& consumerStates.get("1" + partitions.get(2)) == Status.ACTIVE);
315+
316+
consumer1.close();
317+
318+
waitAtMost(
319+
() ->
320+
consumerStates.get("2" + partitions.get(0)) == Status.ACTIVE
321+
&& consumerStates.get("2" + partitions.get(1)) == Status.ACTIVE
322+
&& consumerStates.get("2" + partitions.get(2)) == Status.ACTIVE);
323+
324+
consumer2.close();
325+
}
232326
}

0 commit comments

Comments
 (0)