Skip to content

Commit f9a1769

Browse files
committed
Add filtering test with STOMP
1 parent 697aed6 commit f9a1769

File tree

3 files changed

+200
-61
lines changed

3 files changed

+200
-61
lines changed

src/test/java/com/rabbitmq/stream/impl/FilteringTest.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package com.rabbitmq.stream.impl;
1616

1717
import static com.rabbitmq.stream.impl.TestUtils.*;
18+
import static com.rabbitmq.stream.impl.TestUtils.CountDownLatchReferenceConditions.completed;
1819
import static java.util.Collections.singletonMap;
1920
import static java.util.stream.Collectors.toList;
2021
import static org.assertj.core.api.Assertions.assertThat;
@@ -326,7 +327,7 @@ void superStream(TestInfo info) throws Exception {
326327
.build(),
327328
confirmationHandler);
328329
});
329-
latchAssert(latch).completes(CONDITION_TIMEOUT);
330+
assertThat(latch).is(completed());
330331
};
331332

332333
insert.run();
@@ -363,6 +364,7 @@ void superStream(TestInfo info) throws Exception {
363364
waitAtMost(
364365
CONDITION_TIMEOUT, () -> filteredConsumedMessageCount.get() == expectedCount);
365366
assertThat(receivedMessageCount).hasValueLessThan(messageCount * 2);
367+
consumer.close();
366368
} finally {
367369
deleteSuperStreamTopology(configurationClient, superStream);
368370
}

src/test/java/com/rabbitmq/stream/impl/StompInteroperabilityTest.java

+177-50
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,7 @@
2020
import static org.assertj.core.api.Assertions.assertThat;
2121
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2222

23-
import com.rabbitmq.stream.Environment;
24-
import com.rabbitmq.stream.EnvironmentBuilder;
25-
import com.rabbitmq.stream.Message;
26-
import com.rabbitmq.stream.OffsetSpecification;
27-
import com.rabbitmq.stream.Producer;
23+
import com.rabbitmq.stream.*;
2824
import io.netty.channel.EventLoopGroup;
2925
import io.netty.channel.nio.NioEventLoopGroup;
3026
import java.io.BufferedReader;
@@ -34,28 +30,15 @@
3430
import java.net.Socket;
3531
import java.nio.charset.StandardCharsets;
3632
import java.time.Duration;
37-
import java.util.HashMap;
38-
import java.util.List;
39-
import java.util.Map;
40-
import java.util.Set;
41-
import java.util.UUID;
42-
import java.util.concurrent.ConcurrentHashMap;
43-
import java.util.concurrent.CopyOnWriteArrayList;
44-
import java.util.concurrent.CountDownLatch;
45-
import java.util.concurrent.ExecutorService;
46-
import java.util.concurrent.Executors;
47-
import java.util.concurrent.Future;
48-
import java.util.concurrent.TimeoutException;
33+
import java.util.*;
34+
import java.util.concurrent.*;
4935
import java.util.concurrent.atomic.AtomicBoolean;
5036
import java.util.concurrent.atomic.AtomicInteger;
5137
import java.util.concurrent.atomic.AtomicLong;
5238
import java.util.concurrent.atomic.AtomicReference;
5339
import java.util.function.Predicate;
54-
import org.junit.jupiter.api.AfterAll;
55-
import org.junit.jupiter.api.AfterEach;
56-
import org.junit.jupiter.api.BeforeAll;
57-
import org.junit.jupiter.api.BeforeEach;
58-
import org.junit.jupiter.api.Test;
40+
import java.util.function.Supplier;
41+
import org.junit.jupiter.api.*;
5942
import org.junit.jupiter.api.extension.ExtendWith;
6043

6144
@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
@@ -64,6 +47,8 @@ public class StompInteroperabilityTest {
6447

6548
public static final String MESSAGE_ID = "message-id";
6649
public static final String X_STREAM_OFFSET = "x-stream-offset";
50+
public static final String X_STREAM_FILTER_SIZE_BYTES = "x-stream-filter-size-bytes";
51+
public static final String X_STREAM_FILTER_VALUE = "x-stream-filter-value";
6752
public static final String MESSAGE_COMMAND = "MESSAGE";
6853
public static final String ACK_COMMAND = "ACK";
6954
private static final String NEW_LINE = "\n";
@@ -109,20 +94,19 @@ static long offset(String line) {
10994
void init() throws Exception {
11095
environmentBuilder = Environment.builder();
11196
env = environmentBuilder.netty().eventLoopGroup(eventLoopGroup).environmentBuilder().build();
112-
socket = new Socket("localhost", 61613);
113-
out = socket.getOutputStream();
114-
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
11597
executorService = Executors.newSingleThreadExecutor();
11698
}
11799

118100
@AfterEach
119101
void tearDown() throws Exception {
120102
env.close();
121-
socket.close();
122103
executorService.shutdownNow();
123104
}
124105

125106
void stompConnect() throws Exception {
107+
socket = new Socket("localhost", 61613);
108+
out = socket.getOutputStream();
109+
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
126110
byte[] connect =
127111
frameBuilder()
128112
.command("CONNECT")
@@ -139,6 +123,28 @@ void stompConnect() throws Exception {
139123
}
140124
}
141125

126+
void stompDisconnect() throws Exception {
127+
String receipt = UUID.randomUUID().toString();
128+
byte[] connect =
129+
frameBuilder()
130+
.command("DISCONNECT")
131+
.header("receipt", receipt)
132+
.header("passcode", "guest")
133+
.build();
134+
out.write(connect);
135+
waitForReceipt(receipt);
136+
socket.close();
137+
}
138+
139+
void waitForReceipt(String receipt) throws Exception {
140+
AtomicBoolean gotReceipt = new AtomicBoolean(false);
141+
read(
142+
line -> {
143+
gotReceipt.compareAndSet(false, line.contains(receipt));
144+
return line.equals(NULL) && gotReceipt.get();
145+
});
146+
}
147+
142148
void read(Predicate<String> condition) throws Exception {
143149
read(condition, Duration.ofSeconds(10));
144150
}
@@ -181,13 +187,7 @@ void publishToStompDestinationConsumeFromStream() throws Exception {
181187
.build();
182188

183189
out.write(frame);
184-
185-
AtomicBoolean gotReceipt = new AtomicBoolean(false);
186-
read(
187-
line -> {
188-
gotReceipt.compareAndSet(false, line.contains(receipt));
189-
return line.equals(NULL) && gotReceipt.get();
190-
});
190+
waitForReceipt(receipt);
191191

192192
CountDownLatch latch = new CountDownLatch(1);
193193
AtomicReference<Message> messageReference = new AtomicReference<>();
@@ -215,34 +215,39 @@ void publishToStompDestinationConsumeFromStream() throws Exception {
215215

216216
assertThat(message.getMessageAnnotations().get("x-routing-key")).isEqualTo(stream);
217217
assertThat(message.getMessageAnnotations().get("x-exchange")).isEqualTo("");
218+
stompDisconnect();
218219
}
219220

220221
void stompSubscribe(String stream, String ack, int prefetchCount) throws Exception {
221-
stompSubscribe(stream, ack, prefetchCount, null);
222+
stompSubscribe(stream, ack, prefetchCount, null, Collections.emptyMap());
222223
}
223224

224225
void stompSubscribe(String stream, String ack, int prefetchCount, String offset)
225226
throws Exception {
226-
String receipt = UUID.randomUUID().toString();
227-
FrameBuilder builder =
228-
frameBuilder()
229-
.command("SUBSCRIBE")
230-
.header("id", "0")
231-
.header("destination", "/amq/queue/" + stream)
232-
.header("ack", ack)
233-
.header("prefetch-count", String.valueOf(prefetchCount))
234-
.header("receipt", receipt);
227+
stompSubscribe(stream, ack, prefetchCount, offset, Collections.emptyMap());
228+
}
235229

230+
void stompSubscribe(
231+
String stream, String ack, int prefetchCount, String offset, Map<String, String> headers)
232+
throws Exception {
233+
String receipt = UUID.randomUUID().toString();
234+
Map<String, String> defaultHeaders = new LinkedHashMap<>();
235+
defaultHeaders.put("id", "0");
236+
defaultHeaders.put("destination", "/amq/queue/" + stream);
237+
defaultHeaders.put("ack", ack);
238+
defaultHeaders.put("prefetch-count", String.valueOf(prefetchCount));
239+
defaultHeaders.put("receipt", receipt);
236240
if (offset != null) {
237-
builder.header("x-stream-offset", offset);
241+
defaultHeaders.put("x-stream-offset", offset);
238242
}
243+
defaultHeaders.putAll(headers);
244+
245+
FrameBuilder builder = frameBuilder().command("SUBSCRIBE");
246+
247+
defaultHeaders.forEach(builder::header);
248+
239249
out.write(builder.build());
240-
AtomicBoolean gotReceipt = new AtomicBoolean(false);
241-
read(
242-
line -> {
243-
gotReceipt.compareAndSet(false, line.contains(receipt));
244-
return line.equals(NULL) && gotReceipt.get();
245-
});
250+
waitForReceipt(receipt);
246251
}
247252

248253
@Test
@@ -309,6 +314,8 @@ void publishToStreamConsumeFromStomp() throws Exception {
309314
assertThat(headers.get("timestamp")).isEqualTo("1000"); // in seconds
310315
assertThat(headers.get("some-header")).isEqualTo("some header value");
311316
assertThat(headers.get("x-stream-offset")).isNotNull().isEqualTo("0");
317+
318+
stompDisconnect();
312319
}
313320

314321
@Test
@@ -350,6 +357,122 @@ void offsetTypeFirstShouldStartConsumingFromBeginning() throws Exception {
350357

351358
assertThat(first.get()).isEqualTo(0);
352359
assertThat(last.get()).isEqualTo(messageCount - 1);
360+
stompDisconnect();
361+
}
362+
363+
@Test
364+
@DisabledIfFilteringNotSupported
365+
void filtering(TestInfo info) throws Exception {
366+
int messageCount = 1000;
367+
repeatIfFailure(
368+
() -> {
369+
String s = TestUtils.streamName(info);
370+
371+
stompConnect();
372+
Map<String, String> headers = new LinkedHashMap<>();
373+
headers.put("destination", "/topic/stream-queue-test");
374+
headers.put("x-queue-name", s);
375+
headers.put("x-queue-type", "stream");
376+
headers.put(X_STREAM_FILTER_SIZE_BYTES, "32");
377+
headers.put("durable", "true");
378+
headers.put("auto-delete", "false");
379+
stompSubscribe("does not matter", "client", 10, "first", headers);
380+
stompDisconnect();
381+
382+
List<String> filterValues = new ArrayList<>(Arrays.asList("apple", "banana", "pear"));
383+
Map<String, AtomicInteger> filterValueCount = new HashMap<>();
384+
Random random = new Random();
385+
386+
Callable<Void> insert =
387+
() -> {
388+
stompConnect();
389+
publishStomp(
390+
messageCount,
391+
"/amq/queue/" + s,
392+
() -> {
393+
String filterValue = filterValues.get(random.nextInt(filterValues.size()));
394+
filterValueCount
395+
.computeIfAbsent(filterValue, k -> new AtomicInteger())
396+
.incrementAndGet();
397+
return filterValue;
398+
});
399+
stompDisconnect();
400+
return null;
401+
};
402+
insert.call();
403+
404+
// second wave of messages, with only one, new filter value
405+
String newFilterValue = "orange";
406+
filterValues.clear();
407+
filterValues.add(newFilterValue);
408+
insert.call();
409+
410+
try {
411+
stompConnect();
412+
int prefetchCount = 10;
413+
headers.clear();
414+
headers.put("destination", "/amq/queue/" + s);
415+
headers.put("x-stream-filter", newFilterValue);
416+
headers.put("x-stream-match-unfiltered", "true");
417+
stompSubscribe("does not matter", "client", prefetchCount, "first", headers);
418+
419+
int expectedCount = filterValueCount.get(newFilterValue).get();
420+
421+
AtomicInteger receivedMessageCount = new AtomicInteger(0);
422+
AtomicInteger filteredConsumedMessageCount = new AtomicInteger(0);
423+
AtomicReference<String> lastMessageId = new AtomicReference<>();
424+
read(
425+
line -> {
426+
if (line.contains(MESSAGE_COMMAND)) {
427+
receivedMessageCount.incrementAndGet();
428+
}
429+
if (hasHeader(line, MESSAGE_ID)) {
430+
lastMessageId.set(header(line));
431+
}
432+
if (hasHeader(line, X_STREAM_FILTER_VALUE)) {
433+
String filterValue = header(line);
434+
if (newFilterValue.equals(filterValue)) {
435+
filteredConsumedMessageCount.incrementAndGet();
436+
}
437+
}
438+
if (line.contains(NULL) && receivedMessageCount.get() % prefetchCount == 0) {
439+
write(
440+
frameBuilder()
441+
.command(ACK_COMMAND)
442+
.header(MESSAGE_ID, lastMessageId.get())
443+
.build());
444+
}
445+
446+
return line.contains(NULL) && filteredConsumedMessageCount.get() == expectedCount;
447+
});
448+
assertThat(filteredConsumedMessageCount).hasValue(expectedCount);
449+
assertThat(receivedMessageCount).hasValueLessThan(messageCount * 2);
450+
} finally {
451+
stompDisconnect();
452+
env.deleteStream(s);
453+
}
454+
});
455+
}
456+
457+
private void publishStomp(
458+
int messageCount, String destination, Supplier<String> filterValueSupplier) throws Exception {
459+
String messageBody = UUID.randomUUID().toString();
460+
for (int i = 0; i < messageCount; i++) {
461+
String receipt = UUID.randomUUID().toString();
462+
byte[] frame =
463+
frameBuilder()
464+
.command("SEND")
465+
.header("destination", destination)
466+
.header("content-type", "text/plain")
467+
.header("content-length", String.valueOf(messageBody.length()))
468+
.header("some-header", "some header value")
469+
.header("receipt", receipt)
470+
.header(X_STREAM_FILTER_VALUE, filterValueSupplier.get())
471+
.body(messageBody)
472+
.build();
473+
out.write(frame);
474+
waitForReceipt(receipt);
475+
}
353476
}
354477

355478
void write(byte[] content) {
@@ -413,6 +536,7 @@ void offsetTypeLastShouldStartConsumingFromTheLastChunk() throws Exception {
413536

414537
assertThat(first.get()).isEqualTo(chunkOffset.get());
415538
assertThat(last.get()).isEqualTo(lastOffset);
539+
stompDisconnect();
416540
}
417541

418542
@Test
@@ -470,6 +594,7 @@ void offsetTypeNextShouldReturnNewPublishedMessages() throws Exception {
470594

471595
assertThat(first.get()).isEqualTo(firstWaveMessageCount);
472596
assertThat(last.get()).isEqualTo(lastOffset);
597+
stompDisconnect();
473598
}
474599

475600
@Test
@@ -512,6 +637,7 @@ void offsetTypeOffsetShouldStartConsumingFromOffset() throws Exception {
512637

513638
assertThat(first.get()).isEqualTo(offset);
514639
assertThat(last.get()).isEqualTo(messageCount - 1);
640+
stompDisconnect();
515641
}
516642

517643
@Test
@@ -570,6 +696,7 @@ void offsetTypeTimestampShouldStartConsumingFromTimestamp() throws Exception {
570696
assertThat(last.get()).isEqualTo(lastOffset);
571697
consumed.stream()
572698
.forEach(v -> assertThat(v).startsWith("second wave").doesNotStartWith("first wave"));
699+
stompDisconnect();
573700
}
574701

575702
private static class FrameBuilder {

0 commit comments

Comments
 (0)