|
27 | 27 | import com.rabbitmq.stream.EnvironmentBuilder;
|
28 | 28 | import com.rabbitmq.stream.OffsetSpecification;
|
29 | 29 | import com.rabbitmq.stream.impl.Client.ClientParameters;
|
30 |
| -import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast; |
31 | 30 | import io.netty.channel.EventLoopGroup;
|
32 | 31 | import java.nio.charset.StandardCharsets;
|
33 | 32 | import java.util.Collections;
|
@@ -97,7 +96,6 @@ void tearDown() throws Exception {
|
97 | 96 | }
|
98 | 97 |
|
99 | 98 | @Test
|
100 |
| - @BrokerVersionAtLeast("3.9.6") |
101 | 99 | void consumeAllMessagesFromAllPartitions() throws Exception {
|
102 | 100 | declareSuperStreamTopology(connection, superStream, partitionCount);
|
103 | 101 | Client client = cf.get();
|
@@ -130,7 +128,6 @@ void consumeAllMessagesFromAllPartitions() throws Exception {
|
130 | 128 | }
|
131 | 129 |
|
132 | 130 | @Test
|
133 |
| - @BrokerVersionAtLeast("3.9.6") |
134 | 131 | void manualOffsetTrackingShouldStoreOnAllPartitions() throws Exception {
|
135 | 132 | declareSuperStreamTopology(connection, superStream, partitionCount);
|
136 | 133 | Client client = cf.get();
|
@@ -180,4 +177,56 @@ void manualOffsetTrackingShouldStoreOnAllPartitions() throws Exception {
|
180 | 177 | p -> assertThat(client.queryOffset(consumerName, p)).isGreaterThan(almostLastOffset));
|
181 | 178 | consumer.close();
|
182 | 179 | }
|
| 180 | + |
| 181 | + @Test |
| 182 | + void autoOffsetTrackingShouldStoreOnAllPartitions() throws Exception { |
| 183 | + declareSuperStreamTopology(connection, superStream, partitionCount); |
| 184 | + Client client = cf.get(); |
| 185 | + List<String> partitions = client.partitions(superStream); |
| 186 | + int messageCount = 10000 * partitionCount; |
| 187 | + publishToPartitions(cf, partitions, messageCount); |
| 188 | + ConcurrentMap<String, AtomicInteger> messagesReceived = new ConcurrentHashMap<>(partitionCount); |
| 189 | + ConcurrentMap<String, Long> lastOffsets = new ConcurrentHashMap<>(partitionCount); |
| 190 | + partitions.forEach( |
| 191 | + p -> { |
| 192 | + messagesReceived.put(p, new AtomicInteger(0)); |
| 193 | + }); |
| 194 | + CountDownLatch consumeLatch = new CountDownLatch(messageCount); |
| 195 | + String consumerName = "my-app"; |
| 196 | + AtomicInteger totalCount = new AtomicInteger(); |
| 197 | + Consumer consumer = |
| 198 | + environment |
| 199 | + .consumerBuilder() |
| 200 | + .superStream(superStream) |
| 201 | + .offset(OffsetSpecification.first()) |
| 202 | + .name(consumerName) |
| 203 | + .autoTrackingStrategy() |
| 204 | + .messageCountBeforeStorage(messageCount / partitionCount / 50) |
| 205 | + .builder() |
| 206 | + .messageHandler( |
| 207 | + (context, message) -> { |
| 208 | + String partition = new String(message.getBodyAsBinary()); |
| 209 | + messagesReceived.get(partition).incrementAndGet(); |
| 210 | + lastOffsets.put(partition, context.offset()); |
| 211 | + totalCount.incrementAndGet(); |
| 212 | + if (totalCount.get() % 50 == 0) { |
| 213 | + context.storeOffset(); |
| 214 | + } |
| 215 | + consumeLatch.countDown(); |
| 216 | + }) |
| 217 | + .build(); |
| 218 | + latchAssert(consumeLatch).completes(); |
| 219 | + assertThat(messagesReceived).hasSize(partitionCount); |
| 220 | + partitions.forEach( |
| 221 | + p -> { |
| 222 | + assertThat(messagesReceived).containsKey(p); |
| 223 | + assertThat(messagesReceived.get(p).get()).isEqualTo(messageCount / partitionCount); |
| 224 | + }); |
| 225 | + // checking stored offsets are big enough |
| 226 | + // offset near the end (the message count per partition minus a few messages) |
| 227 | + long almostLastOffset = messageCount / partitionCount - messageCount / (partitionCount * 10); |
| 228 | + partitions.forEach( |
| 229 | + p -> assertThat(client.queryOffset(consumerName, p)).isGreaterThan(almostLastOffset)); |
| 230 | + consumer.close(); |
| 231 | + } |
183 | 232 | }
|
0 commit comments