Skip to content

Commit 8249b65

Browse files
committed
Add SAC test with custom consumer update listener
To mimick custom offset tracking. References rabbitmq/rabbitmq-server#3753
1 parent c63e872 commit 8249b65

File tree

1 file changed

+63
-0
lines changed

1 file changed

+63
-0
lines changed

src/test/java/com/rabbitmq/stream/impl/StreamConsumerSacTest.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,64 @@ void autoTrackingSecondConsumerShouldTakeOverWhereTheFirstOneLeftOff() throws Ex
8585
receivedMessages.get(1).incrementAndGet();
8686
})
8787
.offset(OffsetSpecification.first())
88+
.autoTrackingStrategy()
89+
.builder()
90+
.build();
91+
92+
publishAndWaitForConfirms(cf, messageCount, stream);
93+
waitAtMost(() -> receivedMessages.getOrDefault(0, new AtomicInteger(0)).get() == messageCount);
94+
95+
assertThat(lastReceivedOffset).hasPositiveValue();
96+
assertThat(receivedMessages.get(1)).hasValue(0);
97+
98+
long firstWaveLimit = lastReceivedOffset.get();
99+
consumer1.close();
100+
101+
publishAndWaitForConfirms(cf, messageCount, stream);
102+
103+
waitAtMost(() -> receivedMessages.getOrDefault(0, new AtomicInteger(1)).get() == messageCount);
104+
assertThat(lastReceivedOffset).hasValueGreaterThan(firstWaveLimit);
105+
assertThat(receivedMessages.get(0)).hasValue(messageCount);
106+
107+
consumer2.close();
108+
}
109+
110+
@Test
111+
void customTrackingSecondConsumerShouldTakeOverWhereTheFirstOneLeftOff() throws Exception {
112+
int messageCount = 10000;
113+
Map<Integer, AtomicInteger> receivedMessages = new ConcurrentHashMap<>();
114+
receivedMessages.put(0, new AtomicInteger(0));
115+
receivedMessages.put(1, new AtomicInteger(0));
116+
AtomicLong lastReceivedOffset = new AtomicLong(0);
117+
String consumerName = "foo";
118+
Consumer consumer1 =
119+
environment.consumerBuilder().stream(stream)
120+
.name(consumerName)
121+
.singleActiveConsumer()
122+
.messageHandler(
123+
(context, message) -> {
124+
lastReceivedOffset.set(context.offset());
125+
receivedMessages.get(0).incrementAndGet();
126+
})
127+
.offset(OffsetSpecification.first())
128+
.manualTrackingStrategy()
129+
.builder()
130+
.consumerUpdateListener(context -> OffsetSpecification.offset(lastReceivedOffset.get()))
131+
.build();
132+
133+
Consumer consumer2 =
134+
environment.consumerBuilder().stream(stream)
135+
.name(consumerName)
136+
.singleActiveConsumer()
137+
.messageHandler(
138+
(context, message) -> {
139+
lastReceivedOffset.set(context.offset());
140+
receivedMessages.get(1).incrementAndGet();
141+
})
142+
.offset(OffsetSpecification.first())
143+
.manualTrackingStrategy()
144+
.builder()
145+
.consumerUpdateListener(context -> OffsetSpecification.offset(lastReceivedOffset.get()))
88146
.build();
89147

90148
publishAndWaitForConfirms(cf, messageCount, stream);
@@ -101,5 +159,10 @@ void autoTrackingSecondConsumerShouldTakeOverWhereTheFirstOneLeftOff() throws Ex
101159
waitAtMost(() -> receivedMessages.getOrDefault(0, new AtomicInteger(1)).get() == messageCount);
102160
assertThat(lastReceivedOffset).hasValueGreaterThan(firstWaveLimit);
103161
assertThat(receivedMessages.get(0)).hasValue(messageCount);
162+
163+
consumer2.close();
164+
165+
// nothing stored on the server side
166+
assertThat(cf.get().queryOffset(consumerName, stream)).isZero();
104167
}
105168
}

0 commit comments

Comments
 (0)