Skip to content

Commit 90b8f8b

Browse files
committed
Add AMQP-based filtering test
1 parent ae93fc7 commit 90b8f8b

File tree

1 file changed

+91
-0
lines changed

1 file changed

+91
-0
lines changed

src/test/java/com/rabbitmq/stream/impl/FilteringTest.java

+91
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,19 @@
1414
package com.rabbitmq.stream.impl;
1515

1616
import static com.rabbitmq.stream.impl.TestUtils.*;
17+
import static java.util.Collections.singletonMap;
1718
import static org.assertj.core.api.Assertions.assertThat;
1819
import static org.assertj.core.api.Assertions.assertThatThrownBy;
1920

21+
import com.rabbitmq.client.*;
22+
import com.rabbitmq.client.AMQP.BasicProperties.Builder;
2023
import com.rabbitmq.stream.*;
24+
import com.rabbitmq.stream.Consumer;
2125
import io.netty.channel.EventLoopGroup;
26+
import java.io.IOException;
2227
import java.time.Duration;
2328
import java.util.*;
29+
import java.util.concurrent.Callable;
2430
import java.util.concurrent.ConcurrentHashMap;
2531
import java.util.concurrent.CountDownLatch;
2632
import java.util.concurrent.atomic.AtomicInteger;
@@ -197,6 +203,74 @@ void setFilterSizeOnCreation(TestInfo info) {
197203
.isInstanceOf(IllegalArgumentException.class);
198204
}
199205

206+
@Test
207+
void publishConsumeAmqp() throws Exception {
208+
int messageCount = 1000;
209+
repeatIfFailure(
210+
() -> {
211+
List<String> filterValues = new ArrayList<>(Arrays.asList("apple", "banana", "pear"));
212+
Map<String, AtomicInteger> filterValueCount = new HashMap<>();
213+
Random random = new Random();
214+
215+
try (Connection c = new ConnectionFactory().newConnection()) {
216+
Callable<Void> insert =
217+
() -> {
218+
publishAmqp(
219+
c,
220+
messageCount,
221+
() -> {
222+
String filterValue = filterValues.get(random.nextInt(filterValues.size()));
223+
filterValueCount
224+
.computeIfAbsent(filterValue, k -> new AtomicInteger())
225+
.incrementAndGet();
226+
return filterValue;
227+
});
228+
return null;
229+
};
230+
insert.call();
231+
232+
// second wave of messages, with only one, new filter value
233+
String newFilterValue = "orange";
234+
filterValues.clear();
235+
filterValues.add(newFilterValue);
236+
insert.call();
237+
238+
AtomicInteger receivedMessageCount = new AtomicInteger(0);
239+
AtomicInteger filteredConsumedMessageCount = new AtomicInteger(0);
240+
Channel ch = c.createChannel();
241+
ch.basicQos(10);
242+
Map<String, Object> arguments = new HashMap<>();
243+
arguments.put("x-stream-filter", newFilterValue);
244+
arguments.put("x-stream-offset", 0);
245+
ch.basicConsume(
246+
stream,
247+
false,
248+
arguments,
249+
new DefaultConsumer(ch) {
250+
@Override
251+
public void handleDelivery(
252+
String consumerTag,
253+
Envelope envelope,
254+
AMQP.BasicProperties properties,
255+
byte[] body)
256+
throws IOException {
257+
receivedMessageCount.incrementAndGet();
258+
String filterValue =
259+
properties.getHeaders().get("x-stream-filter-value").toString();
260+
if (newFilterValue.equals(filterValue)) {
261+
filteredConsumedMessageCount.incrementAndGet();
262+
}
263+
ch.basicAck(envelope.getDeliveryTag(), false);
264+
}
265+
});
266+
int expectedCount = filterValueCount.get(newFilterValue).get();
267+
waitAtMost(
268+
CONDITION_TIMEOUT, () -> filteredConsumedMessageCount.get() == expectedCount);
269+
assertThat(receivedMessageCount).hasValueLessThan(messageCount * 2);
270+
}
271+
});
272+
}
273+
200274
private ProducerBuilder producerBuilder() {
201275
return this.environment.producerBuilder().stream(stream);
202276
}
@@ -232,6 +306,23 @@ private void publish(
232306
producer.close();
233307
}
234308

309+
private void publishAmqp(Connection c, int messageCount, Supplier<String> filterValueSupplier)
310+
throws Exception {
311+
try (Channel ch = c.createChannel()) {
312+
ch.confirmSelect();
313+
for (int i = 0; i < messageCount; i++) {
314+
ch.basicPublish(
315+
"",
316+
stream,
317+
new Builder()
318+
.headers(singletonMap("x-stream-filter-value", filterValueSupplier.get()))
319+
.build(),
320+
null);
321+
}
322+
ch.waitForConfirmsOrDie();
323+
}
324+
}
325+
235326
private static void repeatIfFailure(RunnableWithException test) throws Exception {
236327
int executionCount = 0;
237328
Throwable lastException = null;

0 commit comments

Comments
 (0)