Skip to content

Commit 90c55c7

Browse files
committed
Add SAC test for consumer balance on super stream
References rabbitmq/rabbitmq-server#3753
1 parent 9605cd9 commit 90c55c7

File tree

5 files changed

+136
-12
lines changed

5 files changed

+136
-12
lines changed

src/test/java/com/rabbitmq/stream/impl/AlarmsTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515

1616
import static com.rabbitmq.stream.Host.diskAlarm;
1717
import static com.rabbitmq.stream.Host.memoryAlarm;
18+
import static com.rabbitmq.stream.impl.TestUtils.ExceptionConditions.responseCode;
1819
import static com.rabbitmq.stream.impl.TestUtils.latchAssert;
1920
import static com.rabbitmq.stream.impl.TestUtils.localhost;
20-
import static com.rabbitmq.stream.impl.TestUtils.responseCode;
2121
import static java.util.concurrent.TimeUnit.SECONDS;
2222
import static java.util.stream.IntStream.range;
2323
import static org.assertj.core.api.Assertions.assertThat;

src/test/java/com/rabbitmq/stream/impl/ClientTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
1414
package com.rabbitmq.stream.impl;
1515

16+
import static com.rabbitmq.stream.impl.TestUtils.ResponseConditions.ko;
17+
import static com.rabbitmq.stream.impl.TestUtils.ResponseConditions.responseCode;
1618
import static com.rabbitmq.stream.impl.TestUtils.b;
1719
import static com.rabbitmq.stream.impl.TestUtils.latchAssert;
1820
import static com.rabbitmq.stream.impl.TestUtils.streamName;
@@ -544,8 +546,7 @@ void publishAndConsume(boolean directBuffer) throws Exception {
544546
void deleteNonExistingStreamShouldReturnError() {
545547
String nonExistingStream = UUID.randomUUID().toString();
546548
Client.Response response = cf.get().delete(nonExistingStream);
547-
assertThat(response.isOk()).isFalse();
548-
assertThat(response.getResponseCode()).isEqualTo(Constants.RESPONSE_CODE_STREAM_DOES_NOT_EXIST);
549+
assertThat(response).is(ko()).has(responseCode(Constants.RESPONSE_CODE_STREAM_DOES_NOT_EXIST));
549550
}
550551

551552
@Test

src/test/java/com/rabbitmq/stream/impl/SingleActiveConsumerTest.java

Lines changed: 101 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,22 +13,31 @@
1313
1414
package com.rabbitmq.stream.impl;
1515

16+
import static com.rabbitmq.stream.impl.TestUtils.ResponseConditions.ok;
1617
import static com.rabbitmq.stream.impl.TestUtils.b;
18+
import static com.rabbitmq.stream.impl.TestUtils.declareSuperStreamTopology;
19+
import static com.rabbitmq.stream.impl.TestUtils.deleteSuperStreamTopology;
1720
import static com.rabbitmq.stream.impl.TestUtils.latchAssert;
21+
import static com.rabbitmq.stream.impl.TestUtils.streamName;
1822
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
1923
import static org.assertj.core.api.Assertions.assertThat;
2024

2125
import com.rabbitmq.client.Connection;
2226
import com.rabbitmq.client.ConnectionFactory;
2327
import com.rabbitmq.stream.OffsetSpecification;
2428
import com.rabbitmq.stream.impl.Client.ClientParameters;
29+
import com.rabbitmq.stream.impl.Client.ConsumerUpdateListener;
2530
import com.rabbitmq.stream.impl.Client.Response;
2631
import java.util.HashMap;
32+
import java.util.List;
2733
import java.util.Map;
2834
import java.util.concurrent.ConcurrentHashMap;
2935
import java.util.concurrent.CountDownLatch;
3036
import java.util.concurrent.atomic.AtomicInteger;
3137
import java.util.concurrent.atomic.AtomicLong;
38+
import java.util.concurrent.atomic.AtomicReference;
39+
import java.util.function.Consumer;
40+
import java.util.stream.Collectors;
3241
import java.util.stream.IntStream;
3342
import org.junit.jupiter.api.Test;
3443
import org.junit.jupiter.api.TestInfo;
@@ -199,7 +208,7 @@ void singleActiveConsumerShouldRolloverWhenAnotherJoinsPartition(TestInfo info)
199208
Map<Byte, Boolean> consumerStates = consumerStates(2);
200209
AtomicLong lastReceivedOffset = new AtomicLong(0);
201210
Map<Byte, AtomicInteger> receivedMessages = receivedMessages(2);
202-
String superStream = TestUtils.streamName(info);
211+
String superStream = streamName(info);
203212
String consumerName = "foo";
204213
Connection c = new ConnectionFactory().newConnection();
205214
try {
@@ -299,4 +308,95 @@ void singleActiveConsumerShouldRolloverWhenAnotherJoinsPartition(TestInfo info)
299308
c.close();
300309
}
301310
}
311+
312+
@Test
313+
void singleActiveConsumersShouldSpreadOnSuperStreamPartitions(TestInfo info) throws Exception {
314+
Map<Byte, Boolean> consumerStates = consumerStates(3 * 3);
315+
String superStream = streamName(info);
316+
String consumerName = "foo";
317+
Connection c = new ConnectionFactory().newConnection();
318+
// client 1: 0, 1, 2 / client 2: 3, 4, 5, / client 3: 6, 7, 8
319+
try {
320+
declareSuperStreamTopology(c, superStream, 3);
321+
List<String> partitions =
322+
IntStream.range(0, 3).mapToObj(i -> superStream + "-" + i).collect(Collectors.toList());
323+
ConsumerUpdateListener consumerUpdateListener =
324+
(client1, subscriptionId, active) -> {
325+
System.out.println(subscriptionId + " " + active);
326+
consumerStates.put(subscriptionId, active);
327+
return null;
328+
};
329+
Client client1 =
330+
cf.get(new ClientParameters().consumerUpdateListener(consumerUpdateListener));
331+
Map<String, String> subscriptionProperties = new HashMap<>();
332+
subscriptionProperties.put("single-active-consumer", "true");
333+
subscriptionProperties.put("name", consumerName);
334+
subscriptionProperties.put("super-stream", superStream);
335+
AtomicInteger subscriptionCounter = new AtomicInteger(0);
336+
AtomicReference<Client> client = new AtomicReference<>();
337+
Consumer<String> subscriptionCallback =
338+
partition -> {
339+
Response response =
340+
client
341+
.get()
342+
.subscribe(
343+
b(subscriptionCounter.getAndIncrement()),
344+
partition,
345+
OffsetSpecification.first(),
346+
2,
347+
subscriptionProperties);
348+
assertThat(response).is(ok());
349+
};
350+
351+
client.set(client1);
352+
partitions.forEach(subscriptionCallback);
353+
354+
waitAtMost(
355+
() -> consumerStates.get(b(0)) && consumerStates.get(b(1)) && consumerStates.get(b(2)));
356+
357+
Client client2 =
358+
cf.get(new ClientParameters().consumerUpdateListener(consumerUpdateListener));
359+
360+
client.set(client2);
361+
partitions.forEach(subscriptionCallback);
362+
363+
waitAtMost(
364+
() -> consumerStates.get(b(0)) && consumerStates.get(b(4)) && consumerStates.get(b(2)));
365+
366+
Client client3 =
367+
cf.get(new ClientParameters().consumerUpdateListener(consumerUpdateListener));
368+
369+
client.set(client3);
370+
partitions.forEach(subscriptionCallback);
371+
372+
waitAtMost(
373+
() -> consumerStates.get(b(0)) && consumerStates.get(b(4)) && consumerStates.get(b(8)));
374+
375+
Consumer<String> unsubscriptionCallback =
376+
partition -> {
377+
int subId = subscriptionCounter.getAndIncrement();
378+
Response response = client.get().unsubscribe(b(subId));
379+
assertThat(response).is(ok());
380+
consumerStates.put(b(subId), false);
381+
};
382+
383+
subscriptionCounter.set(0);
384+
client.set(client1);
385+
partitions.forEach(unsubscriptionCallback);
386+
387+
waitAtMost(
388+
() -> consumerStates.get(b(3)) && consumerStates.get(b(7)) && consumerStates.get(b(5)));
389+
390+
client.set(client2);
391+
partitions.forEach(unsubscriptionCallback);
392+
393+
waitAtMost(
394+
() -> consumerStates.get(b(6)) && consumerStates.get(b(7)) && consumerStates.get(b(8)));
395+
396+
client.set(client3);
397+
partitions.forEach(unsubscriptionCallback);
398+
} finally {
399+
deleteSuperStreamTopology(c, superStream, 3);
400+
}
401+
}
302402
}

src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
1414
package com.rabbitmq.stream.impl;
1515

16+
import static com.rabbitmq.stream.impl.TestUtils.ExceptionConditions.responseCode;
1617
import static com.rabbitmq.stream.impl.TestUtils.latchAssert;
1718
import static com.rabbitmq.stream.impl.TestUtils.localhost;
1819
import static com.rabbitmq.stream.impl.TestUtils.localhostTls;
@@ -419,7 +420,7 @@ void createStreamWithDifferentParametersShouldThrowException(TestInfo info) {
419420
env.streamCreator().stream(s).maxAge(Duration.ofDays(1)).create();
420421
assertThatThrownBy(() -> env.streamCreator().stream(s).maxAge(Duration.ofDays(4)).create())
421422
.isInstanceOf(StreamException.class)
422-
.has(TestUtils.responseCode(Constants.RESPONSE_CODE_PRECONDITION_FAILED));
423+
.has(responseCode(Constants.RESPONSE_CODE_PRECONDITION_FAILED));
423424
} finally {
424425
assertThat(client.delete(s).isOk()).isTrue();
425426
}

src/test/java/com/rabbitmq/stream/impl/TestUtils.java

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.rabbitmq.stream.StreamException;
2929
import com.rabbitmq.stream.impl.Client.Broker;
3030
import com.rabbitmq.stream.impl.Client.ClientParameters;
31+
import com.rabbitmq.stream.impl.Client.Response;
3132
import com.rabbitmq.stream.impl.Client.StreamMetadata;
3233
import io.netty.channel.EventLoopGroup;
3334
import io.netty.channel.nio.NioEventLoopGroup;
@@ -334,13 +335,34 @@ static CountDownLatchAssert latchAssert(AtomicReference<CountDownLatch> latchRef
334335
return new CountDownLatchAssert(latchReference.get());
335336
}
336337

337-
static Condition<Throwable> responseCode(short expectedResponseCode) {
338-
String message = "expected code for stream exception is " + expectedResponseCode;
339-
return new Condition<>(
340-
throwable ->
341-
throwable instanceof StreamException
342-
&& ((StreamException) throwable).getCode() == expectedResponseCode,
343-
message);
338+
static class ResponseConditions {
339+
340+
static Condition<Response> ok() {
341+
return new Condition<>(Response::isOk, "Response should be OK");
342+
}
343+
344+
static Condition<Response> ko() {
345+
return new Condition<>(response -> !response.isOk(), "Response should be OK");
346+
}
347+
348+
static Condition<Response> responseCode(short expectedResponse) {
349+
return new Condition<>(
350+
response -> response.getResponseCode() == expectedResponse,
351+
"response code %s",
352+
Utils.formatConstant(expectedResponse));
353+
}
354+
}
355+
356+
static class ExceptionConditions {
357+
358+
static Condition<Throwable> responseCode(short expectedResponseCode) {
359+
String message = "code " + Utils.formatConstant(expectedResponseCode);
360+
return new Condition<>(
361+
throwable ->
362+
throwable instanceof StreamException
363+
&& ((StreamException) throwable).getCode() == expectedResponseCode,
364+
message);
365+
}
344366
}
345367

346368
static Map<String, StreamMetadata> metadata(String stream, Broker leader, List<Broker> replicas) {

0 commit comments

Comments
 (0)