Skip to content

Commit 0ffa0de

Browse files
committed
Add SAC test for consumer balance on super stream
References rabbitmq/rabbitmq-server#3753
1 parent 7301ca9 commit 0ffa0de

File tree

5 files changed

+136
-12
lines changed

5 files changed

+136
-12
lines changed

src/test/java/com/rabbitmq/stream/impl/AlarmsTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515

1616
import static com.rabbitmq.stream.Host.diskAlarm;
1717
import static com.rabbitmq.stream.Host.memoryAlarm;
18+
import static com.rabbitmq.stream.impl.TestUtils.ExceptionConditions.responseCode;
1819
import static com.rabbitmq.stream.impl.TestUtils.latchAssert;
1920
import static com.rabbitmq.stream.impl.TestUtils.localhost;
20-
import static com.rabbitmq.stream.impl.TestUtils.responseCode;
2121
import static java.util.concurrent.TimeUnit.SECONDS;
2222
import static java.util.stream.IntStream.range;
2323
import static org.assertj.core.api.Assertions.assertThat;

src/test/java/com/rabbitmq/stream/impl/ClientTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
1414
package com.rabbitmq.stream.impl;
1515

16+
import static com.rabbitmq.stream.impl.TestUtils.ResponseConditions.ko;
17+
import static com.rabbitmq.stream.impl.TestUtils.ResponseConditions.responseCode;
1618
import static com.rabbitmq.stream.impl.TestUtils.b;
1719
import static com.rabbitmq.stream.impl.TestUtils.latchAssert;
1820
import static com.rabbitmq.stream.impl.TestUtils.streamName;
@@ -581,8 +583,7 @@ void publishAndConsume(boolean directBuffer) throws Exception {
581583
void deleteNonExistingStreamShouldReturnError() {
582584
String nonExistingStream = UUID.randomUUID().toString();
583585
Client.Response response = cf.get().delete(nonExistingStream);
584-
assertThat(response.isOk()).isFalse();
585-
assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_STREAM_DOES_NOT_EXIST);
586+
assertThat(response).is(ko()).has(responseCode(Constants.RESPONSE_CODE_STREAM_DOES_NOT_EXIST));
586587
}
587588

588589
@Test

src/test/java/com/rabbitmq/stream/impl/SingleActiveConsumerTest.java

Lines changed: 101 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,22 +13,31 @@
1313
1414
package com.rabbitmq.stream.impl;
1515

16+
import static com.rabbitmq.stream.impl.TestUtils.ResponseConditions.ok;
1617
import static com.rabbitmq.stream.impl.TestUtils.b;
18+
import static com.rabbitmq.stream.impl.TestUtils.declareSuperStreamTopology;
19+
import static com.rabbitmq.stream.impl.TestUtils.deleteSuperStreamTopology;
1720
import static com.rabbitmq.stream.impl.TestUtils.latchAssert;
21+
import static com.rabbitmq.stream.impl.TestUtils.streamName;
1822
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
1923
import static org.assertj.core.api.Assertions.assertThat;
2024

2125
import com.rabbitmq.client.Connection;
2226
import com.rabbitmq.client.ConnectionFactory;
2327
import com.rabbitmq.stream.OffsetSpecification;
2428
import com.rabbitmq.stream.impl.Client.ClientParameters;
29+
import com.rabbitmq.stream.impl.Client.ConsumerUpdateListener;
2530
import com.rabbitmq.stream.impl.Client.Response;
2631
import java.util.HashMap;
32+
import java.util.List;
2733
import java.util.Map;
2834
import java.util.concurrent.ConcurrentHashMap;
2935
import java.util.concurrent.CountDownLatch;
3036
import java.util.concurrent.atomic.AtomicInteger;
3137
import java.util.concurrent.atomic.AtomicLong;
38+
import java.util.concurrent.atomic.AtomicReference;
39+
import java.util.function.Consumer;
40+
import java.util.stream.Collectors;
3241
import java.util.stream.IntStream;
3342
import org.junit.jupiter.api.Test;
3443
import org.junit.jupiter.api.TestInfo;
@@ -199,7 +208,7 @@ void singleActiveConsumerShouldRolloverWhenAnotherJoinsPartition(TestInfo info)
199208
Map<Byte, Boolean> consumerStates = consumerStates(2);
200209
AtomicLong lastReceivedOffset = new AtomicLong(0);
201210
Map<Byte, AtomicInteger> receivedMessages = receivedMessages(2);
202-
String superStream = TestUtils.streamName(info);
211+
String superStream = streamName(info);
203212
String consumerName = "foo";
204213
Connection c = new ConnectionFactory().newConnection();
205214
try {
@@ -299,4 +308,95 @@ void singleActiveConsumerShouldRolloverWhenAnotherJoinsPartition(TestInfo info)
299308
c.close();
300309
}
301310
}
311+
312+
@Test
313+
void singleActiveConsumersShouldSpreadOnSuperStreamPartitions(TestInfo info) throws Exception {
314+
Map<Byte, Boolean> consumerStates = consumerStates(3 * 3);
315+
String superStream = streamName(info);
316+
String consumerName = "foo";
317+
Connection c = new ConnectionFactory().newConnection();
318+
// client 1: 0, 1, 2 / client 2: 3, 4, 5, / client 3: 6, 7, 8
319+
try {
320+
declareSuperStreamTopology(c, superStream, 3);
321+
List<String> partitions =
322+
IntStream.range(0, 3).mapToObj(i -> superStream + "-" + i).collect(Collectors.toList());
323+
ConsumerUpdateListener consumerUpdateListener =
324+
(client1, subscriptionId, active) -> {
325+
System.out.println(subscriptionId + " " + active);
326+
consumerStates.put(subscriptionId, active);
327+
return null;
328+
};
329+
Client client1 =
330+
cf.get(new ClientParameters().consumerUpdateListener(consumerUpdateListener));
331+
Map<String, String> subscriptionProperties = new HashMap<>();
332+
subscriptionProperties.put("single-active-consumer", "true");
333+
subscriptionProperties.put("name", consumerName);
334+
subscriptionProperties.put("super-stream", superStream);
335+
AtomicInteger subscriptionCounter = new AtomicInteger(0);
336+
AtomicReference<Client> client = new AtomicReference<>();
337+
Consumer<String> subscriptionCallback =
338+
partition -> {
339+
Response response =
340+
client
341+
.get()
342+
.subscribe(
343+
b(subscriptionCounter.getAndIncrement()),
344+
partition,
345+
OffsetSpecification.first(),
346+
2,
347+
subscriptionProperties);
348+
assertThat(response).is(ok());
349+
};
350+
351+
client.set(client1);
352+
partitions.forEach(subscriptionCallback);
353+
354+
waitAtMost(
355+
() -> consumerStates.get(b(0)) && consumerStates.get(b(1)) && consumerStates.get(b(2)));
356+
357+
Client client2 =
358+
cf.get(new ClientParameters().consumerUpdateListener(consumerUpdateListener));
359+
360+
client.set(client2);
361+
partitions.forEach(subscriptionCallback);
362+
363+
waitAtMost(
364+
() -> consumerStates.get(b(0)) && consumerStates.get(b(4)) && consumerStates.get(b(2)));
365+
366+
Client client3 =
367+
cf.get(new ClientParameters().consumerUpdateListener(consumerUpdateListener));
368+
369+
client.set(client3);
370+
partitions.forEach(subscriptionCallback);
371+
372+
waitAtMost(
373+
() -> consumerStates.get(b(0)) && consumerStates.get(b(4)) && consumerStates.get(b(8)));
374+
375+
Consumer<String> unsubscriptionCallback =
376+
partition -> {
377+
int subId = subscriptionCounter.getAndIncrement();
378+
Response response = client.get().unsubscribe(b(subId));
379+
assertThat(response).is(ok());
380+
consumerStates.put(b(subId), false);
381+
};
382+
383+
subscriptionCounter.set(0);
384+
client.set(client1);
385+
partitions.forEach(unsubscriptionCallback);
386+
387+
waitAtMost(
388+
() -> consumerStates.get(b(3)) && consumerStates.get(b(7)) && consumerStates.get(b(5)));
389+
390+
client.set(client2);
391+
partitions.forEach(unsubscriptionCallback);
392+
393+
waitAtMost(
394+
() -> consumerStates.get(b(6)) && consumerStates.get(b(7)) && consumerStates.get(b(8)));
395+
396+
client.set(client3);
397+
partitions.forEach(unsubscriptionCallback);
398+
} finally {
399+
deleteSuperStreamTopology(c, superStream, 3);
400+
}
401+
}
302402
}

src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
1414
package com.rabbitmq.stream.impl;
1515

16+
import static com.rabbitmq.stream.impl.TestUtils.ExceptionConditions.responseCode;
1617
import static com.rabbitmq.stream.impl.TestUtils.latchAssert;
1718
import static com.rabbitmq.stream.impl.TestUtils.localhost;
1819
import static com.rabbitmq.stream.impl.TestUtils.localhostTls;
@@ -424,7 +425,7 @@ void createStreamWithDifferentParametersShouldThrowException(TestInfo info) {
424425
env.streamCreator().stream(s).maxAge(Duration.ofDays(1)).create();
425426
assertThatThrownBy(() -> env.streamCreator().stream(s).maxAge(Duration.ofDays(4)).create())
426427
.isInstanceOf(StreamException.class)
427-
.has(TestUtils.responseCode(Constants.RESPONSE_CODE_PRECONDITION_FAILED));
428+
.has(responseCode(Constants.RESPONSE_CODE_PRECONDITION_FAILED));
428429
} finally {
429430
assertThat(client.delete(s).isOk()).isTrue();
430431
}

src/test/java/com/rabbitmq/stream/impl/TestUtils.java

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.rabbitmq.stream.StreamException;
3030
import com.rabbitmq.stream.impl.Client.Broker;
3131
import com.rabbitmq.stream.impl.Client.ClientParameters;
32+
import com.rabbitmq.stream.impl.Client.Response;
3233
import com.rabbitmq.stream.impl.Client.StreamMetadata;
3334
import io.netty.channel.EventLoopGroup;
3435
import io.netty.channel.nio.NioEventLoopGroup;
@@ -364,13 +365,34 @@ static CountDownLatchAssert latchAssert(AtomicReference<CountDownLatch> latchRef
364365
return new CountDownLatchAssert(latchReference.get());
365366
}
366367

367-
static Condition<Throwable> responseCode(short expectedResponseCode) {
368-
String message = "expected code for stream exception is " + expectedResponseCode;
369-
return new Condition<>(
370-
throwable ->
371-
throwable instanceof StreamException
372-
&& ((StreamException) throwable).getCode() == expectedResponseCode,
373-
message);
368+
static class ResponseConditions {
369+
370+
static Condition<Response> ok() {
371+
return new Condition<>(Response::isOk, "Response should be OK");
372+
}
373+
374+
static Condition<Response> ko() {
375+
return new Condition<>(response -> !response.isOk(), "Response should be OK");
376+
}
377+
378+
static Condition<Response> responseCode(short expectedResponse) {
379+
return new Condition<>(
380+
response -> response.getResponseCode() == expectedResponse,
381+
"response code %s",
382+
Utils.formatConstant(expectedResponse));
383+
}
384+
}
385+
386+
static class ExceptionConditions {
387+
388+
static Condition<Throwable> responseCode(short expectedResponseCode) {
389+
String message = "code " + Utils.formatConstant(expectedResponseCode);
390+
return new Condition<>(
391+
throwable ->
392+
throwable instanceof StreamException
393+
&& ((StreamException) throwable).getCode() == expectedResponseCode,
394+
message);
395+
}
374396
}
375397

376398
static Map<String, StreamMetadata> metadata(String stream, Broker leader, List<Broker> replicas) {

0 commit comments

Comments
 (0)