Skip to content

Commit effeab5

Browse files
committed
Adapt SAC tests to SAC coordinator refactoring
The (de)activation protocol changed a bit internally. New consumers assume they are passive by default, until they are activated (no passive update notification if the consumer was not active previously). References #46, rabbitmq/rabbitmq-server#3753
1 parent 6f6da91 commit effeab5

File tree

4 files changed

+26
-46
lines changed

4 files changed

+26
-46
lines changed

src/main/java/com/rabbitmq/stream/ConsumerUpdateListener.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ interface Context {
2929
}
3030

3131
enum Status {
32-
STARTING,
3332
ACTIVE,
3433
PASSIVE
3534
}

src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ class StreamConsumer implements Consumer {
121121
}
122122

123123
if (Utils.isSac(subscriptionProperties)) {
124-
this.sacStatus = ConsumerUpdateListener.Status.STARTING;
124+
this.sacStatus = ConsumerUpdateListener.Status.PASSIVE;
125125
MessageHandler existingMessageHandler = decoratedMessageHandler.get();
126126
MessageHandler messageHandlerWithSac =
127127
(context, message) -> {
@@ -138,8 +138,7 @@ class StreamConsumer implements Consumer {
138138
ConsumerUpdateListener defaultListener =
139139
context -> {
140140
OffsetSpecification result = null;
141-
if ((context.previousStatus() == ConsumerUpdateListener.Status.STARTING
142-
|| context.previousStatus() == ConsumerUpdateListener.Status.PASSIVE)
141+
if (context.previousStatus() == ConsumerUpdateListener.Status.PASSIVE
143142
&& context.status() == ConsumerUpdateListener.Status.ACTIVE) {
144143
LOGGER.debug("Looking up offset (stream {})", this.stream);
145144
Consumer consumer = context.consumer();
@@ -193,11 +192,22 @@ class StreamConsumer implements Consumer {
193192
if (context.previousStatus() == ConsumerUpdateListener.Status.PASSIVE
194193
&& context.status() == ConsumerUpdateListener.Status.ACTIVE) {
195194
LOGGER.debug("Going from passive to active, looking up offset");
196-
StreamConsumer consumer = (StreamConsumer) context.consumer();
197-
long offset = consumer.storedOffset();
198-
LOGGER.debug(
199-
"Stored offset is {}, returning the value + 1 to the server", offset);
200-
result = OffsetSpecification.offset(offset + 1);
195+
Consumer consumer = context.consumer();
196+
try {
197+
long offset = consumer.storedOffset();
198+
LOGGER.debug(
199+
"Stored offset is {}, returning the value + 1 to the server", offset);
200+
result = OffsetSpecification.offset(offset + 1);
201+
} catch (StreamException e) {
202+
if (e.getCode() == Constants.RESPONSE_CODE_NO_OFFSET) {
203+
LOGGER.debug(
204+
"No stored offset, using initial offset specification: {}",
205+
this.initialOffsetSpecification);
206+
result = initialOffsetSpecification;
207+
} else {
208+
throw e;
209+
}
210+
}
201211
}
202212
return result;
203213
};

src/test/java/com/rabbitmq/stream/impl/SacClientTest.java

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import static com.rabbitmq.stream.impl.TestUtils.b;
1818
import static com.rabbitmq.stream.impl.TestUtils.declareSuperStreamTopology;
1919
import static com.rabbitmq.stream.impl.TestUtils.deleteSuperStreamTopology;
20-
import static com.rabbitmq.stream.impl.TestUtils.latchAssert;
2120
import static com.rabbitmq.stream.impl.TestUtils.streamName;
2221
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
2322
import static org.assertj.core.api.Assertions.assertThat;
@@ -32,7 +31,6 @@
3231
import java.util.List;
3332
import java.util.Map;
3433
import java.util.concurrent.ConcurrentHashMap;
35-
import java.util.concurrent.CountDownLatch;
3634
import java.util.concurrent.atomic.AtomicInteger;
3735
import java.util.concurrent.atomic.AtomicLong;
3836
import java.util.concurrent.atomic.AtomicReference;
@@ -70,7 +68,6 @@ void secondSubscriptionShouldTakeOverAfterFirstOneUnsubscribes() throws Exceptio
7068
AtomicLong lastReceivedOffset = new AtomicLong(0);
7169
Map<Byte, Boolean> consumerStates = consumerStates(2);
7270
Map<Byte, AtomicInteger> receivedMessages = receivedMessages(2);
73-
CountDownLatch consumerUpdateLatch = new CountDownLatch(2);
7471
String consumerName = "foo";
7572
ClientParameters clientParameters =
7673
new ClientParameters()
@@ -85,7 +82,6 @@ void secondSubscriptionShouldTakeOverAfterFirstOneUnsubscribes() throws Exceptio
8582
.consumerUpdateListener(
8683
(client, subscriptionId, active) -> {
8784
consumerStates.put(subscriptionId, active);
88-
consumerUpdateLatch.countDown();
8985
long storedOffset = writerClient.queryOffset(consumerName, stream).getOffset();
9086
if (storedOffset == 0) {
9187
return OffsetSpecification.first();
@@ -104,7 +100,7 @@ void secondSubscriptionShouldTakeOverAfterFirstOneUnsubscribes() throws Exceptio
104100
assertThat(response.isOk()).isTrue();
105101
response = client.subscribe(b(1), stream, OffsetSpecification.first(), 2, parameters);
106102
assertThat(response.isOk()).isTrue();
107-
latchAssert(consumerUpdateLatch).completes();
103+
waitAtMost(() -> consumerStates.get(b(0)));
108104
assertThat(consumerStates)
109105
.hasSize(2)
110106
.containsEntry(b(0), Boolean.TRUE)
@@ -135,7 +131,7 @@ void secondSubscriptionShouldTakeOverAfterFirstOneUnsubscribes() throws Exceptio
135131
}
136132

137133
@Test
138-
void consumerUpdateListenerShouldBeCalledInOrder() throws Exception {
134+
void consumerUpdateListenerShouldBeCalledOnlyWhenConsumerGetsActivated() throws Exception {
139135
StringBuffer consumerUpdateHistory = new StringBuffer();
140136
Client client =
141137
cf.get(
@@ -158,11 +154,6 @@ void consumerUpdateListenerShouldBeCalledInOrder() throws Exception {
158154
response =
159155
client.subscribe(subscriptionId, stream, OffsetSpecification.first(), 2, parameters);
160156
assertThat(response.isOk()).isTrue();
161-
waitAtMost(
162-
() ->
163-
consumerUpdateHistory
164-
.toString()
165-
.contains(String.format("<%d.%b>", subscriptionId, false)));
166157
}
167158

168159
for (int i = 0; i < 9; i++) {
@@ -198,10 +189,10 @@ void noConsumerUpdateOnConnectionClosingIfSubscriptionNotUnsubscribed() throws E
198189
assertThat(response.isOk()).isTrue();
199190
response = client.subscribe(b(1), stream, OffsetSpecification.first(), 2, parameters);
200191
assertThat(response.isOk()).isTrue();
201-
waitAtMost(() -> consumerUpdateCount.get() == 2);
192+
waitAtMost(() -> consumerUpdateCount.get() == 1);
202193

203194
client.close();
204-
assertThat(consumerUpdateCount).hasValue(2);
195+
assertThat(consumerUpdateCount).hasValue(1);
205196
}
206197

207198
@Test
@@ -301,7 +292,7 @@ void singleActiveConsumerShouldRolloverWhenAnotherJoinsPartition(TestInfo info)
301292
response = client.unsubscribe(b(1));
302293
assertThat(response.isOk()).isTrue();
303294
waitAtMost(() -> consumerStates.get(b(0)) == true);
304-
assertThat(consumerStates).containsEntry(b(0), true); // should not change when unsubscribing
295+
assertThat(consumerStates).containsEntry(b(1), true); // should not change when unsubscribing
305296

306297
response = client.unsubscribe(b(0));
307298
assertThat(response.isOk()).isTrue();

src/test/java/com/rabbitmq/stream/impl/SacSuperStreamConsumerTest.java

Lines changed: 3 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -124,16 +124,13 @@ void sacShouldSpreadAcrossPartitions() throws Exception {
124124
&& consumerStates.get("1" + partitions.get(1)) == Status.ACTIVE
125125
&& consumerStates.get("2" + partitions.get(2)) == Status.ACTIVE);
126126

127-
waitAtMost(() -> consumerStates.size() == 9);
127+
// all 3 from sub 0 activated, then only 1 from sub 1 & sub 2
128+
waitAtMost(() -> consumerStates.size() == 3 + 1 + 1);
128129
assertThat(consumerStates)
129130
.containsEntry("0" + partitions.get(0), Status.ACTIVE)
130131
.containsEntry("0" + partitions.get(1), Status.PASSIVE)
131132
.containsEntry("0" + partitions.get(2), Status.PASSIVE)
132-
.containsEntry("1" + partitions.get(0), Status.PASSIVE)
133133
.containsEntry("1" + partitions.get(1), Status.ACTIVE)
134-
.containsEntry("1" + partitions.get(2), Status.PASSIVE)
135-
.containsEntry("2" + partitions.get(0), Status.PASSIVE)
136-
.containsEntry("2" + partitions.get(1), Status.PASSIVE)
137134
.containsEntry("2" + partitions.get(2), Status.ACTIVE);
138135

139136
consumer0.close();
@@ -144,11 +141,11 @@ void sacShouldSpreadAcrossPartitions() throws Exception {
144141
&& consumerStates.get("2" + partitions.get(1)) == Status.ACTIVE
145142
&& consumerStates.get("1" + partitions.get(2)) == Status.ACTIVE);
146143

144+
waitAtMost(() -> consumerStates.size() == (3 + 1 + 1) + 1 + 1 + 1);
147145
assertThat(consumerStates)
148146
.containsEntry("1" + partitions.get(0), Status.ACTIVE)
149147
.containsEntry("1" + partitions.get(1), Status.PASSIVE)
150148
.containsEntry("1" + partitions.get(2), Status.ACTIVE)
151-
.containsEntry("2" + partitions.get(0), Status.PASSIVE)
152149
.containsEntry("2" + partitions.get(1), Status.ACTIVE)
153150
.containsEntry("2" + partitions.get(2), Status.PASSIVE);
154151

@@ -725,7 +722,6 @@ void sacCustomOffsetTrackingShouldTakeOverOnRelanbancing() throws Exception {
725722
receivedMessagesPerPartitions
726723
.values()
727724
.forEach(v -> assertThat(v).hasValue(expectedMessageCountPerPartition));
728-
Client c = cf.get();
729725
partitions.forEach(
730726
partition ->
731727
assertThat(lastReceivedOffsets.get(partition))
@@ -739,20 +735,4 @@ private static void waitUntil(CallableBooleanSupplier action) {
739735
throw new RuntimeException(e);
740736
}
741737
}
742-
743-
private static OffsetSpecification lastStoredOffset(
744-
Consumer consumer, OffsetSpecification defaultValue) {
745-
OffsetSpecification offsetSpecification;
746-
try {
747-
long storedOffset = consumer.storedOffset();
748-
offsetSpecification = OffsetSpecification.offset(storedOffset);
749-
} catch (StreamException e) {
750-
if (e.getCode() == Constants.RESPONSE_CODE_NO_OFFSET) {
751-
offsetSpecification = defaultValue;
752-
} else {
753-
throw e;
754-
}
755-
}
756-
return offsetSpecification;
757-
}
758738
}

0 commit comments

Comments
 (0)