Skip to content

Commit c44a349

Browse files
committed
Test that credit requests are ignored for inactive consumers
1 parent b7f3297 commit c44a349

File tree

3 files changed

+128
-9
lines changed

3 files changed

+128
-9
lines changed

src/test/java/com/rabbitmq/stream/impl/SacClientTest.java

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,31 @@
1717
import static com.rabbitmq.stream.impl.TestUtils.b;
1818
import static com.rabbitmq.stream.impl.TestUtils.declareSuperStreamTopology;
1919
import static com.rabbitmq.stream.impl.TestUtils.deleteSuperStreamTopology;
20+
import static com.rabbitmq.stream.impl.TestUtils.latchAssert;
2021
import static com.rabbitmq.stream.impl.TestUtils.streamName;
2122
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
2223
import static java.util.stream.Collectors.toList;
2324
import static org.assertj.core.api.Assertions.assertThat;
2425

2526
import com.rabbitmq.client.Connection;
2627
import com.rabbitmq.client.ConnectionFactory;
28+
import com.rabbitmq.stream.Constants;
2729
import com.rabbitmq.stream.Host;
2830
import com.rabbitmq.stream.OffsetSpecification;
2931
import com.rabbitmq.stream.impl.Client.ClientParameters;
3032
import com.rabbitmq.stream.impl.Client.ConsumerUpdateListener;
33+
import com.rabbitmq.stream.impl.Client.CreditNotification;
34+
import com.rabbitmq.stream.impl.Client.MessageListener;
3135
import com.rabbitmq.stream.impl.Client.Response;
3236
import com.rabbitmq.stream.impl.TestUtils.DisabledIfRabbitMqCtlNotSet;
37+
import java.nio.charset.StandardCharsets;
38+
import java.util.Collections;
3339
import java.util.HashMap;
3440
import java.util.List;
3541
import java.util.Map;
3642
import java.util.concurrent.ConcurrentHashMap;
43+
import java.util.concurrent.CountDownLatch;
44+
import java.util.concurrent.atomic.AtomicBoolean;
3745
import java.util.concurrent.atomic.AtomicInteger;
3846
import java.util.concurrent.atomic.AtomicLong;
3947
import java.util.concurrent.atomic.AtomicReference;
@@ -456,4 +464,114 @@ void killingConnectionsShouldTriggerConsumerUpdateNotification() throws Exceptio
456464
&& consumerStates.get(consumerName + "-connection-1"));
457465
}
458466
}
467+
468+
@Test
469+
void superStreamRebalancingShouldWorkWhilePublishing(TestInfo info) throws Exception {
470+
Map<Byte, Boolean> consumerStates = consumerStates(3 * 3);
471+
String superStream = streamName(info);
472+
String consumerName = "foo";
473+
Connection c = new ConnectionFactory().newConnection();
474+
AtomicBoolean keepPublishing = new AtomicBoolean(true);
475+
try {
476+
declareSuperStreamTopology(c, superStream, 3);
477+
// we use the second partition because a rebalancing occurs
478+
// when the second consumer joins
479+
String partitionInUse = superStream + "-1";
480+
481+
Client publisher = cf.get();
482+
publisher.declarePublisher(b(0), null, partitionInUse);
483+
new Thread(
484+
() -> {
485+
while (keepPublishing.get()) {
486+
publisher.publish(
487+
b(0),
488+
Collections.singletonList(
489+
publisher
490+
.messageBuilder()
491+
.addData("hello".getBytes(StandardCharsets.UTF_8))
492+
.build()));
493+
TestUtils.waitMs(10);
494+
}
495+
})
496+
.start();
497+
498+
AtomicLong lastDispatchedOffset = new AtomicLong(0);
499+
ConsumerUpdateListener consumerUpdateListener =
500+
(client1, subscriptionId, active) -> {
501+
consumerStates.put(subscriptionId, active);
502+
return lastDispatchedOffset.get() == 0
503+
? OffsetSpecification.first()
504+
: OffsetSpecification.offset(lastDispatchedOffset.get());
505+
};
506+
CountDownLatch receivedMessagesLatch = new CountDownLatch(100);
507+
MessageListener messageListener =
508+
(subscriptionId, offset, chunkTimestamp, message) -> {
509+
lastDispatchedOffset.set(offset);
510+
receivedMessagesLatch.countDown();
511+
};
512+
AtomicInteger creditNotificationResponseCode = new AtomicInteger();
513+
// we keep track of credit errors
514+
// with the amount of initial credit and the rebalancing,
515+
// the first subscriber is likely to have in-flight credit commands
516+
// when it becomes inactive. The server should then sends some credit
517+
// notifications to tell the client it's not supposed to ask for credits
518+
// for this subscription.
519+
CreditNotification creditNotification =
520+
(subscriptionId, responseCode) -> creditNotificationResponseCode.set(responseCode);
521+
ClientParameters clientParameters =
522+
new ClientParameters()
523+
.chunkListener(
524+
(client, subscriptionId, offset, messageCount, dataSize) ->
525+
client.credit(subscriptionId, 1))
526+
.messageListener(messageListener)
527+
.creditNotification(creditNotification)
528+
.consumerUpdateListener(consumerUpdateListener);
529+
Client client1 = cf.get(clientParameters);
530+
Map<String, String> subscriptionProperties = new HashMap<>();
531+
subscriptionProperties.put("single-active-consumer", "true");
532+
subscriptionProperties.put("name", consumerName);
533+
subscriptionProperties.put("super-stream", superStream);
534+
AtomicInteger subscriptionCounter = new AtomicInteger(0);
535+
AtomicReference<Client> client = new AtomicReference<>();
536+
Consumer<String> subscriptionCallback =
537+
partition -> {
538+
Response response =
539+
client
540+
.get()
541+
.subscribe(
542+
b(subscriptionCounter.getAndIncrement()),
543+
partition,
544+
OffsetSpecification.first(),
545+
10,
546+
subscriptionProperties);
547+
assertThat(response).is(ok());
548+
};
549+
550+
client.set(client1);
551+
subscriptionCallback.accept(partitionInUse);
552+
553+
waitAtMost(() -> consumerStates.get(b(0)));
554+
555+
latchAssert(receivedMessagesLatch).completes();
556+
557+
Client client2 = cf.get(clientParameters);
558+
559+
client.set(client2);
560+
subscriptionCallback.accept(partitionInUse);
561+
562+
waitAtMost(() -> consumerStates.get(b(1)));
563+
564+
waitAtMost(
565+
() ->
566+
creditNotificationResponseCode.get() == Constants.RESPONSE_CODE_PRECONDITION_FAILED);
567+
568+
Response response = client1.unsubscribe(b(0));
569+
assertThat(response).is(ok());
570+
response = client2.unsubscribe(b(1));
571+
assertThat(response).is(ok());
572+
} finally {
573+
keepPublishing.set(false);
574+
deleteSuperStreamTopology(c, superStream, 3);
575+
}
576+
}
459577
}

src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import static com.rabbitmq.stream.impl.TestUtils.localhost;
1919
import static com.rabbitmq.stream.impl.TestUtils.streamName;
2020
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
21+
import static com.rabbitmq.stream.impl.TestUtils.waitMs;
2122
import static java.lang.String.format;
2223
import static org.assertj.core.api.Assertions.assertThat;
2324
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -97,14 +98,6 @@ static Stream<java.util.function.Consumer<Object>> consumerShouldKeepConsumingAf
9798
"broker is restarted"));
9899
}
99100

100-
private static void waitMs(long waitTime) {
101-
try {
102-
Thread.sleep(waitTime);
103-
} catch (InterruptedException e) {
104-
throw new RuntimeException(e);
105-
}
106-
}
107-
108101
@BeforeEach
109102
void init() {
110103
if (Host.isOnDocker()) {

src/test/java/com/rabbitmq/stream/impl/TestUtils.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -848,7 +848,15 @@ static Level newLoggerLevel(Class<?> c, Level level) {
848848
logger.setLevel(level);
849849
return initialLevel;
850850
}
851-
851+
852+
static void waitMs(long waitTime) {
853+
try {
854+
Thread.sleep(waitTime);
855+
} catch (InterruptedException e) {
856+
throw new RuntimeException(e);
857+
}
858+
}
859+
852860
@Target({ElementType.TYPE, ElementType.METHOD})
853861
@Retention(RetentionPolicy.RUNTIME)
854862
@Tag("single-active-consumer")

0 commit comments

Comments
 (0)