Skip to content

Commit 72d1671

Browse files
committed
Add test cases for partitions and route
Make sure partitions are returned in correct order and non-stream queues are filtered out.
1 parent 0c015ab commit 72d1671

File tree

2 files changed

+54
-6
lines changed

2 files changed

+54
-6
lines changed

src/test/java/com/rabbitmq/stream/impl/RoutePartitionsTest.java

+34
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818
import static java.util.stream.Collectors.toList;
1919
import static org.assertj.core.api.Assertions.assertThat;
2020

21+
import com.rabbitmq.client.Channel;
2122
import com.rabbitmq.client.Connection;
2223
import com.rabbitmq.client.ConnectionFactory;
2324
import com.rabbitmq.stream.impl.TestUtils.BrokerVersionAtLeast;
25+
import java.util.Arrays;
2426
import java.util.List;
2527
import java.util.UUID;
2628
import java.util.stream.IntStream;
@@ -114,4 +116,36 @@ void routeReturnsMultipleStreamsIfMultipleBindingsForSameKey() throws Exception
114116
.hasSize(2)
115117
.contains(superStream + "-0", superStream + "-1");
116118
}
119+
120+
@Test
121+
@BrokerVersionAtLeast("3.9.6")
122+
void partitionsAndRouteShouldNotReturnNonStreamQueue() throws Exception {
123+
declareSuperStreamTopology(connection, superStream, 3);
124+
Channel channel = connection.createChannel();
125+
String nonStreamQueue = channel.queueDeclare().getQueue();
126+
connection.createChannel().queueBind(nonStreamQueue, superStream, "not-a-stream");
127+
Client client = cf.get();
128+
List<String> streams = client.partitions(superStream);
129+
assertThat(streams)
130+
.hasSize(partitions)
131+
.contains(
132+
IntStream.range(0, partitions)
133+
.mapToObj(i -> superStream + "-" + i)
134+
.toArray(String[]::new));
135+
List<String> routes = client.route("not-a-stream", superStream);
136+
assertThat(routes).isEmpty();
137+
}
138+
139+
@Test
140+
@BrokerVersionAtLeast("3.9.6")
141+
void partitionsReturnsCorrectOrder() throws Exception {
142+
String[] partitionNames = {"z", "y", "x"};
143+
declareSuperStreamTopology(connection, superStream, partitionNames);
144+
Client client = cf.get();
145+
List<String> streams = client.partitions(superStream);
146+
assertThat(streams)
147+
.hasSize(partitions)
148+
.containsSequence(
149+
Arrays.stream(partitionNames).map(p -> superStream + "-" + p).toArray(String[]::new));
150+
}
117151
}

src/test/java/com/rabbitmq/stream/impl/TestUtils.java

+20-6
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import com.rabbitmq.stream.impl.Client.StreamMetadata;
3232
import io.netty.channel.EventLoopGroup;
3333
import io.netty.channel.nio.NioEventLoopGroup;
34+
import io.vavr.Tuple;
35+
import io.vavr.Tuple2;
3436
import java.io.BufferedReader;
3537
import java.io.IOException;
3638
import java.io.InputStream;
@@ -45,6 +47,7 @@
4547
import java.lang.reflect.Method;
4648
import java.nio.charset.StandardCharsets;
4749
import java.time.Duration;
50+
import java.util.ArrayList;
4851
import java.util.Collection;
4952
import java.util.Collections;
5053
import java.util.List;
@@ -231,17 +234,28 @@ static void declareSuperStreamTopology(Connection connection, String superStream
231234
IntStream.range(0, partitions).mapToObj(String::valueOf).toArray(String[]::new));
232235
}
233236

234-
static void declareSuperStreamTopology(
235-
Connection connection, String superStream, String... routingKeys) throws Exception {
237+
static void declareSuperStreamTopology(Connection connection, String superStream, String... rks)
238+
throws Exception {
236239
try (Channel ch = connection.createChannel()) {
237240
ch.exchangeDeclare(superStream, BuiltinExchangeType.DIRECT, true);
238-
for (String routingKey : routingKeys) {
241+
242+
List<Tuple2<String, Integer>> bindings = new ArrayList<>(rks.length);
243+
for (int i = 0; i < rks.length; i++) {
244+
bindings.add(Tuple.of(rks[i], i));
245+
}
246+
// shuffle the order to make sure we get in the correct order from the server
247+
Collections.shuffle(bindings);
248+
249+
for (Tuple2<String, Integer> binding : bindings) {
250+
String routingKey = binding._1();
239251
String partitionName = superStream + "-" + routingKey;
240252
ch.queueDeclare(
241253
partitionName, true, false, false, Collections.singletonMap("x-queue-type", "stream"));
242-
// TODO consider adding some arguments to the bindings
243-
// can be useful to identify a partition, e.g. partition number
244-
ch.queueBind(partitionName, superStream, routingKey);
254+
ch.queueBind(
255+
partitionName,
256+
superStream,
257+
routingKey,
258+
Collections.singletonMap("x-stream-partition-order", binding._2()));
245259
}
246260
}
247261
}

0 commit comments

Comments
 (0)