Skip to content

Commit ac440df

Browse files
committed
Adapt auto offset tracking for super stream consumers
The message count before storage setting is divided by the number of partitions to make sure we store offsets approximately as set on the consumer builder (assuming partitions are balanced).
1 parent 301e157 commit ac440df

File tree

5 files changed

+72
-26
lines changed

5 files changed

+72
-26
lines changed

src/main/java/com/rabbitmq/stream/impl/SuperStreamConsumer.java

+17-7
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,16 @@ class SuperStreamConsumer implements Consumer {
3939
this.superStream = superStream;
4040
List<String> partitions = environment.locator().partitions(superStream);
4141

42-
// for manual tracking offset strategy only
42+
// for manual offset tracking strategy only
4343
ConsumerState[] states = new ConsumerState[partitions.size()];
4444
Map<String, ConsumerState> partitionToStates = new HashMap<>(partitions.size());
4545
for (int i = 0; i < partitions.size(); i++) {
4646
ConsumerState state = new ConsumerState();
4747
states[i] = state;
4848
partitionToStates.put(partitions.get(i), state);
4949
}
50+
// end of manual offset tracking strategy
51+
5052
for (String partition : partitions) {
5153
ConsumerState state = partitionToStates.get(partition);
5254
MessageHandler messageHandler;
@@ -56,13 +58,21 @@ class SuperStreamConsumer implements Consumer {
5658
} else {
5759
messageHandler = builder.messageHandler();
5860
}
61+
StreamConsumerBuilder subConsumerBuilder = builder.duplicate();
62+
63+
if (trackingConfiguration.enabled() && trackingConfiguration.auto()) {
64+
subConsumerBuilder =
65+
(StreamConsumerBuilder)
66+
subConsumerBuilder
67+
.autoTrackingStrategy()
68+
.messageCountBeforeStorage(
69+
trackingConfiguration.autoMessageCountBeforeStorage() / partitions.size())
70+
.builder();
71+
}
72+
5973
Consumer consumer =
60-
builder
61-
.duplicate()
62-
.lazyInit(true)
63-
.superStream(null)
64-
.messageHandler(messageHandler)
65-
.stream(partition)
74+
subConsumerBuilder.lazyInit(true).superStream(null).messageHandler(messageHandler).stream(
75+
partition)
6676
.build();
6777
consumers.put(partition, consumer);
6878
state.consumer = consumer;

src/test/java/com/rabbitmq/stream/impl/RoutePartitionsTest.java

-7
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import com.rabbitmq.client.Channel;
2222
import com.rabbitmq.client.Connection;
2323
import com.rabbitmq.client.ConnectionFactory;
24-
import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast;
2524
import java.util.Arrays;
2625
import java.util.List;
2726
import java.util.UUID;
@@ -55,7 +54,6 @@ void tearDown() throws Exception {
5554
}
5655

5756
@Test
58-
@BrokerVersionAtLeast("3.9.6")
5957
void routeShouldReturnEmptyListWhenExchangeDoesNotExist() {
6058
assertThat(cf.get().route("", UUID.randomUUID().toString())).isEmpty();
6159
}
@@ -66,7 +64,6 @@ void partitionsShouldReturnEmptyListWhenExchangeDoesNotExist() {
6664
}
6765

6866
@Test
69-
@BrokerVersionAtLeast("3.9.6")
7067
void routeShouldReturnNullWhenNoStreamForRoutingKey() throws Exception {
7168
declareSuperStreamTopology(connection, superStream, partitions);
7269

@@ -84,7 +81,6 @@ void partitionsShouldReturnEmptyListWhenThereIsNoBinding() throws Exception {
8481
}
8582

8683
@Test
87-
@BrokerVersionAtLeast("3.9.6")
8884
void routeTopologyWithPartitionCount() throws Exception {
8985
declareSuperStreamTopology(connection, superStream, 3);
9086

@@ -100,7 +96,6 @@ void routeTopologyWithPartitionCount() throws Exception {
10096
}
10197

10298
@Test
103-
@BrokerVersionAtLeast("3.9.6")
10499
void routeReturnsMultipleStreamsIfMultipleBindingsForSameKey() throws Exception {
105100
declareSuperStreamTopology(connection, superStream, 3);
106101
connection.createChannel().queueBind(superStream + "-1", superStream, "0");
@@ -118,7 +113,6 @@ void routeReturnsMultipleStreamsIfMultipleBindingsForSameKey() throws Exception
118113
}
119114

120115
@Test
121-
@BrokerVersionAtLeast("3.9.6")
122116
void partitionsAndRouteShouldNotReturnNonStreamQueue() throws Exception {
123117
declareSuperStreamTopology(connection, superStream, 3);
124118
Channel channel = connection.createChannel();
@@ -137,7 +131,6 @@ void partitionsAndRouteShouldNotReturnNonStreamQueue() throws Exception {
137131
}
138132

139133
@Test
140-
@BrokerVersionAtLeast("3.9.6")
141134
void partitionsReturnsCorrectOrder() throws Exception {
142135
String[] partitionNames = {"z", "y", "x"};
143136
declareSuperStreamTopology(connection, superStream, partitionNames);

src/test/java/com/rabbitmq/stream/impl/SuperStreamConsumerTest.java

+52-3
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import com.rabbitmq.stream.EnvironmentBuilder;
2828
import com.rabbitmq.stream.OffsetSpecification;
2929
import com.rabbitmq.stream.impl.Client.ClientParameters;
30-
import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast;
3130
import io.netty.channel.EventLoopGroup;
3231
import java.nio.charset.StandardCharsets;
3332
import java.util.Collections;
@@ -97,7 +96,6 @@ void tearDown() throws Exception {
9796
}
9897

9998
@Test
100-
@BrokerVersionAtLeast("3.9.6")
10199
void consumeAllMessagesFromAllPartitions() throws Exception {
102100
declareSuperStreamTopology(connection, superStream, partitionCount);
103101
Client client = cf.get();
@@ -130,7 +128,6 @@ void consumeAllMessagesFromAllPartitions() throws Exception {
130128
}
131129

132130
@Test
133-
@BrokerVersionAtLeast("3.9.6")
134131
void manualOffsetTrackingShouldStoreOnAllPartitions() throws Exception {
135132
declareSuperStreamTopology(connection, superStream, partitionCount);
136133
Client client = cf.get();
@@ -180,4 +177,56 @@ void manualOffsetTrackingShouldStoreOnAllPartitions() throws Exception {
180177
p -> assertThat(client.queryOffset(consumerName, p)).isGreaterThan(almostLastOffset));
181178
consumer.close();
182179
}
180+
181+
@Test
182+
void autoOffsetTrackingShouldStoreOnAllPartitions() throws Exception {
183+
declareSuperStreamTopology(connection, superStream, partitionCount);
184+
Client client = cf.get();
185+
List<String> partitions = client.partitions(superStream);
186+
int messageCount = 10000 * partitionCount;
187+
publishToPartitions(cf, partitions, messageCount);
188+
ConcurrentMap<String, AtomicInteger> messagesReceived = new ConcurrentHashMap<>(partitionCount);
189+
ConcurrentMap<String, Long> lastOffsets = new ConcurrentHashMap<>(partitionCount);
190+
partitions.forEach(
191+
p -> {
192+
messagesReceived.put(p, new AtomicInteger(0));
193+
});
194+
CountDownLatch consumeLatch = new CountDownLatch(messageCount);
195+
String consumerName = "my-app";
196+
AtomicInteger totalCount = new AtomicInteger();
197+
Consumer consumer =
198+
environment
199+
.consumerBuilder()
200+
.superStream(superStream)
201+
.offset(OffsetSpecification.first())
202+
.name(consumerName)
203+
.autoTrackingStrategy()
204+
.messageCountBeforeStorage(messageCount / partitionCount / 50)
205+
.builder()
206+
.messageHandler(
207+
(context, message) -> {
208+
String partition = new String(message.getBodyAsBinary());
209+
messagesReceived.get(partition).incrementAndGet();
210+
lastOffsets.put(partition, context.offset());
211+
totalCount.incrementAndGet();
212+
if (totalCount.get() % 50 == 0) {
213+
context.storeOffset();
214+
}
215+
consumeLatch.countDown();
216+
})
217+
.build();
218+
latchAssert(consumeLatch).completes();
219+
assertThat(messagesReceived).hasSize(partitionCount);
220+
partitions.forEach(
221+
p -> {
222+
assertThat(messagesReceived).containsKey(p);
223+
assertThat(messagesReceived.get(p).get()).isEqualTo(messageCount / partitionCount);
224+
});
225+
// checking stored offsets are big enough
226+
// offset near the end (the message count per partition minus a few messages)
227+
long almostLastOffset = messageCount / partitionCount - messageCount / (partitionCount * 10);
228+
partitions.forEach(
229+
p -> assertThat(client.queryOffset(consumerName, p)).isGreaterThan(almostLastOffset));
230+
consumer.close();
231+
}
183232
}

src/test/java/com/rabbitmq/stream/impl/SuperStreamProducerTest.java

-4
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import com.rabbitmq.stream.EnvironmentBuilder;
2727
import com.rabbitmq.stream.OffsetSpecification;
2828
import com.rabbitmq.stream.Producer;
29-
import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast;
3029
import io.netty.channel.EventLoopGroup;
3130
import java.util.Map;
3231
import java.util.UUID;
@@ -74,7 +73,6 @@ void tearDown() throws Exception {
7473
}
7574

7675
@Test
77-
@BrokerVersionAtLeast("3.9.6")
7876
void allMessagesSentToSuperStreamWithHashRoutingShouldBeThenConsumed() throws Exception {
7977
int messageCount = 10_000;
8078
declareSuperStreamTopology(connection, superStream, partitions);
@@ -127,7 +125,6 @@ void allMessagesSentToSuperStreamWithHashRoutingShouldBeThenConsumed() throws Ex
127125
}
128126

129127
@Test
130-
@BrokerVersionAtLeast("3.9.6")
131128
void allMessagesSentToSuperStreamWithRoutingKeyRoutingShouldBeThenConsumed() throws Exception {
132129
int messageCount = 10_000;
133130
routingKeys = new String[] {"amer", "emea", "apac"};
@@ -179,7 +176,6 @@ void allMessagesSentToSuperStreamWithRoutingKeyRoutingShouldBeThenConsumed() thr
179176
}
180177

181178
@Test
182-
@BrokerVersionAtLeast("3.9.6")
183179
void getLastPublishingIdShouldReturnLowestValue() throws Exception {
184180
int messageCount = 10_000;
185181
declareSuperStreamTopology(connection, superStream, partitions);

src/test/java/com/rabbitmq/stream/impl/SuperStreamTest.java

+3-5
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package com.rabbitmq.stream.impl;
1515

1616
import static com.rabbitmq.stream.impl.TestUtils.declareSuperStreamTopology;
17+
import static com.rabbitmq.stream.impl.TestUtils.deleteSuperStreamTopology;
1718
import static com.rabbitmq.stream.impl.TestUtils.latchAssert;
1819
import static com.rabbitmq.stream.impl.TestUtils.localhost;
1920
import static org.assertj.core.api.Assertions.assertThat;
@@ -24,7 +25,6 @@
2425
import com.rabbitmq.stream.EnvironmentBuilder;
2526
import com.rabbitmq.stream.OffsetSpecification;
2627
import com.rabbitmq.stream.Producer;
27-
import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast;
2828
import io.netty.channel.EventLoopGroup;
2929
import java.util.UUID;
3030
import java.util.concurrent.CountDownLatch;
@@ -61,15 +61,14 @@ void init(TestInfo info) throws Exception {
6161
void tearDown() throws Exception {
6262
environment.close();
6363
if (routingKeys == null) {
64-
// deleteSuperStreamTopology(connection, superStream, partitions);
64+
deleteSuperStreamTopology(connection, superStream, partitions);
6565
} else {
66-
// deleteSuperStreamTopology(connection, superStream, routingKeys);
66+
deleteSuperStreamTopology(connection, superStream, routingKeys);
6767
}
6868
connection.close();
6969
}
7070

7171
@Test
72-
@BrokerVersionAtLeast("3.9.6")
7372
void allMessagesSentWithHashRoutingShouldBeThenConsumed() throws Exception {
7473
int messageCount = 10_000 * partitions;
7574
declareSuperStreamTopology(connection, superStream, partitions);
@@ -112,7 +111,6 @@ void allMessagesSentWithHashRoutingShouldBeThenConsumed() throws Exception {
112111
}
113112

114113
@Test
115-
@BrokerVersionAtLeast("3.9.6")
116114
void allMessagesSentWithRoutingKeyRoutingShouldBeThenConsumed() throws Exception {
117115
int messageCount = 10_000 * partitions;
118116
routingKeys = new String[] {"amer", "emea", "apac"};

0 commit comments

Comments
 (0)