|
14 | 14 | package com.rabbitmq.stream.impl;
|
15 | 15 |
|
16 | 16 | import static com.rabbitmq.stream.impl.TestUtils.*;
|
| 17 | +import static java.util.Collections.singletonMap; |
17 | 18 | import static org.assertj.core.api.Assertions.assertThat;
|
18 | 19 | import static org.assertj.core.api.Assertions.assertThatThrownBy;
|
19 | 20 |
|
| 21 | +import com.rabbitmq.client.*; |
| 22 | +import com.rabbitmq.client.AMQP.BasicProperties.Builder; |
20 | 23 | import com.rabbitmq.stream.*;
|
| 24 | +import com.rabbitmq.stream.Consumer; |
21 | 25 | import io.netty.channel.EventLoopGroup;
|
| 26 | +import java.io.IOException; |
22 | 27 | import java.time.Duration;
|
23 | 28 | import java.util.*;
|
| 29 | +import java.util.concurrent.Callable; |
24 | 30 | import java.util.concurrent.ConcurrentHashMap;
|
25 | 31 | import java.util.concurrent.CountDownLatch;
|
26 | 32 | import java.util.concurrent.atomic.AtomicInteger;
|
@@ -197,6 +203,74 @@ void setFilterSizeOnCreation(TestInfo info) {
|
197 | 203 | .isInstanceOf(IllegalArgumentException.class);
|
198 | 204 | }
|
199 | 205 |
|
| 206 | + @Test |
| 207 | + void publishConsumeAmqp() throws Exception { |
| 208 | + int messageCount = 1000; |
| 209 | + repeatIfFailure( |
| 210 | + () -> { |
| 211 | + List<String> filterValues = new ArrayList<>(Arrays.asList("apple", "banana", "pear")); |
| 212 | + Map<String, AtomicInteger> filterValueCount = new HashMap<>(); |
| 213 | + Random random = new Random(); |
| 214 | + |
| 215 | + try (Connection c = new ConnectionFactory().newConnection()) { |
| 216 | + Callable<Void> insert = |
| 217 | + () -> { |
| 218 | + publishAmqp( |
| 219 | + c, |
| 220 | + messageCount, |
| 221 | + () -> { |
| 222 | + String filterValue = filterValues.get(random.nextInt(filterValues.size())); |
| 223 | + filterValueCount |
| 224 | + .computeIfAbsent(filterValue, k -> new AtomicInteger()) |
| 225 | + .incrementAndGet(); |
| 226 | + return filterValue; |
| 227 | + }); |
| 228 | + return null; |
| 229 | + }; |
| 230 | + insert.call(); |
| 231 | + |
| 232 | + // second wave of messages, with only one, new filter value |
| 233 | + String newFilterValue = "orange"; |
| 234 | + filterValues.clear(); |
| 235 | + filterValues.add(newFilterValue); |
| 236 | + insert.call(); |
| 237 | + |
| 238 | + AtomicInteger receivedMessageCount = new AtomicInteger(0); |
| 239 | + AtomicInteger filteredConsumedMessageCount = new AtomicInteger(0); |
| 240 | + Channel ch = c.createChannel(); |
| 241 | + ch.basicQos(10); |
| 242 | + Map<String, Object> arguments = new HashMap<>(); |
| 243 | + arguments.put("x-stream-filter", newFilterValue); |
| 244 | + arguments.put("x-stream-offset", 0); |
| 245 | + ch.basicConsume( |
| 246 | + stream, |
| 247 | + false, |
| 248 | + arguments, |
| 249 | + new DefaultConsumer(ch) { |
| 250 | + @Override |
| 251 | + public void handleDelivery( |
| 252 | + String consumerTag, |
| 253 | + Envelope envelope, |
| 254 | + AMQP.BasicProperties properties, |
| 255 | + byte[] body) |
| 256 | + throws IOException { |
| 257 | + receivedMessageCount.incrementAndGet(); |
| 258 | + String filterValue = |
| 259 | + properties.getHeaders().get("x-stream-filter-value").toString(); |
| 260 | + if (newFilterValue.equals(filterValue)) { |
| 261 | + filteredConsumedMessageCount.incrementAndGet(); |
| 262 | + } |
| 263 | + ch.basicAck(envelope.getDeliveryTag(), false); |
| 264 | + } |
| 265 | + }); |
| 266 | + int expectedCount = filterValueCount.get(newFilterValue).get(); |
| 267 | + waitAtMost( |
| 268 | + CONDITION_TIMEOUT, () -> filteredConsumedMessageCount.get() == expectedCount); |
| 269 | + assertThat(receivedMessageCount).hasValueLessThan(messageCount * 2); |
| 270 | + } |
| 271 | + }); |
| 272 | + } |
| 273 | + |
200 | 274 | private ProducerBuilder producerBuilder() {
|
201 | 275 | return this.environment.producerBuilder().stream(stream);
|
202 | 276 | }
|
@@ -232,6 +306,23 @@ private void publish(
|
232 | 306 | producer.close();
|
233 | 307 | }
|
234 | 308 |
|
| 309 | + private void publishAmqp(Connection c, int messageCount, Supplier<String> filterValueSupplier) |
| 310 | + throws Exception { |
| 311 | + try (Channel ch = c.createChannel()) { |
| 312 | + ch.confirmSelect(); |
| 313 | + for (int i = 0; i < messageCount; i++) { |
| 314 | + ch.basicPublish( |
| 315 | + "", |
| 316 | + stream, |
| 317 | + new Builder() |
| 318 | + .headers(singletonMap("x-stream-filter-value", filterValueSupplier.get())) |
| 319 | + .build(), |
| 320 | + null); |
| 321 | + } |
| 322 | + ch.waitForConfirmsOrDie(); |
| 323 | + } |
| 324 | + } |
| 325 | + |
235 | 326 | private static void repeatIfFailure(RunnableWithException test) throws Exception {
|
236 | 327 | int executionCount = 0;
|
237 | 328 | Throwable lastException = null;
|
|
0 commit comments