Skip to content

Commit 0e3826b

Browse files
committed
Fail message if no route is found
1 parent b4a8ffe commit 0e3826b

File tree

3 files changed

+53
-9
lines changed

3 files changed

+53
-9
lines changed

src/main/java/com/rabbitmq/stream/Constants.java

+1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public final class Constants {
3939
public static final short CODE_PRODUCER_NOT_AVAILABLE = 10_002;
4040
public static final short CODE_PRODUCER_CLOSED = 10_003;
4141
public static final short CODE_PUBLISH_CONFIRM_TIMEOUT = 10_004;
42+
public static final short CODE_NO_ROUTE_FOUND = 10_005;
4243

4344
public static final short COMMAND_DECLARE_PUBLISHER = 1;
4445
public static final short COMMAND_PUBLISH = 2;

src/main/java/com/rabbitmq/stream/impl/SuperStreamProducer.java

+16-9
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
import com.rabbitmq.stream.Codec;
1717
import com.rabbitmq.stream.ConfirmationHandler;
18+
import com.rabbitmq.stream.ConfirmationStatus;
19+
import com.rabbitmq.stream.Constants;
1820
import com.rabbitmq.stream.Message;
1921
import com.rabbitmq.stream.MessageBuilder;
2022
import com.rabbitmq.stream.Producer;
@@ -92,15 +94,20 @@ public void send(Message message, ConfirmationHandler confirmationHandler) {
9294
// TODO handle when the stream is not found (no partition found for the message)
9395
// and call the confirmation handler with a failure
9496
List<String> streams = this.routingStrategy.route(message, superStreamMetadata);
95-
for (String stream : streams) {
96-
Producer producer =
97-
producers.computeIfAbsent(
98-
stream,
99-
stream1 -> {
100-
Producer p = producerBuilder.duplicate().stream(stream1).build();
101-
return p;
102-
});
103-
producer.send(message, confirmationHandler);
97+
if (streams.isEmpty()) {
98+
confirmationHandler.handle(
99+
new ConfirmationStatus(message, false, Constants.CODE_NO_ROUTE_FOUND));
100+
} else {
101+
for (String stream : streams) {
102+
Producer producer =
103+
producers.computeIfAbsent(
104+
stream,
105+
stream1 -> {
106+
Producer p = producerBuilder.duplicate().stream(stream1).build();
107+
return p;
108+
});
109+
producer.send(message, confirmationHandler);
110+
}
104111
}
105112
}
106113

src/test/java/com/rabbitmq/stream/impl/SuperStreamProducerTest.java

+36
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import com.rabbitmq.client.Connection;
2424
import com.rabbitmq.client.ConnectionFactory;
25+
import com.rabbitmq.stream.Constants;
2526
import com.rabbitmq.stream.Environment;
2627
import com.rabbitmq.stream.EnvironmentBuilder;
2728
import com.rabbitmq.stream.OffsetSpecification;
@@ -31,6 +32,8 @@
3132
import java.util.UUID;
3233
import java.util.concurrent.ConcurrentHashMap;
3334
import java.util.concurrent.CountDownLatch;
35+
import java.util.concurrent.atomic.AtomicBoolean;
36+
import java.util.concurrent.atomic.AtomicInteger;
3437
import java.util.concurrent.atomic.AtomicLong;
3538
import java.util.stream.IntStream;
3639
import org.junit.jupiter.api.AfterEach;
@@ -175,6 +178,39 @@ void allMessagesSentToSuperStreamWithRoutingKeyRoutingShouldBeThenConsumed() thr
175178
.isEqualTo(messageCount);
176179
}
177180

181+
@Test
182+
void messageIsNackedIfNoRouteFound() throws Exception {
183+
int messageCount = 10_000;
184+
routingKeys = new String[] {"amer", "emea", "apac"};
185+
declareSuperStreamTopology(connection, superStream, routingKeys);
186+
Producer producer =
187+
environment.producerBuilder().stream(superStream)
188+
.routing(message -> message.getApplicationProperties().get("region").toString())
189+
.key()
190+
.producerBuilder()
191+
.build();
192+
193+
AtomicBoolean confirmed = new AtomicBoolean(true);
194+
AtomicInteger code = new AtomicInteger();
195+
CountDownLatch publishLatch = new CountDownLatch(1);
196+
producer.send(
197+
producer
198+
.messageBuilder()
199+
.applicationProperties()
200+
.entry("region", "atlantis")
201+
.messageBuilder()
202+
.build(),
203+
confirmationStatus -> {
204+
confirmed.set(confirmationStatus.isConfirmed());
205+
code.set(confirmationStatus.getCode());
206+
publishLatch.countDown();
207+
});
208+
209+
assertThat(latchAssert(publishLatch)).completes(5);
210+
assertThat(confirmed).isFalse();
211+
assertThat(code).hasValue(Constants.CODE_NO_ROUTE_FOUND);
212+
}
213+
178214
@Test
179215
void getLastPublishingIdShouldReturnLowestValue() throws Exception {
180216
int messageCount = 10_000;

0 commit comments

Comments
 (0)