Skip to content

Commit 09efe3f

Browse files
committed
Enforce super stream producer closing
Fixes #193
1 parent 6244e61 commit 09efe3f

File tree

2 files changed

+77
-24
lines changed

2 files changed

+77
-24
lines changed

src/main/java/com/rabbitmq/stream/impl/SuperStreamProducer.java

+37-24
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.Map.Entry;
2828
import java.util.concurrent.ConcurrentHashMap;
2929
import java.util.concurrent.CopyOnWriteArrayList;
30+
import java.util.concurrent.atomic.AtomicBoolean;
3031
import org.slf4j.Logger;
3132
import org.slf4j.LoggerFactory;
3233

@@ -42,6 +43,7 @@ class SuperStreamProducer implements Producer {
4243
private final StreamEnvironment environment;
4344
private final String name;
4445
private final Metadata superStreamMetadata;
46+
private final AtomicBoolean closed = new AtomicBoolean(false);
4547

4648
SuperStreamProducer(
4749
StreamProducerBuilder producerBuilder,
@@ -91,36 +93,47 @@ public long getLastPublishingId() {
9193

9294
@Override
9395
public void send(Message message, ConfirmationHandler confirmationHandler) {
94-
List<String> streams = this.routingStrategy.route(message, superStreamMetadata);
95-
if (streams.isEmpty()) {
96-
confirmationHandler.handle(
97-
new ConfirmationStatus(message, false, Constants.CODE_NO_ROUTE_FOUND));
98-
} else {
99-
for (String stream : streams) {
100-
Producer producer =
101-
producers.computeIfAbsent(
102-
stream,
103-
stream1 -> {
104-
Producer p =
105-
producerBuilder.duplicate().superStream(null).stream(stream1).build();
106-
return p;
107-
});
108-
producer.send(message, confirmationHandler);
96+
if (canSend()) {
97+
List<String> streams = this.routingStrategy.route(message, superStreamMetadata);
98+
if (streams.isEmpty()) {
99+
confirmationHandler.handle(
100+
new ConfirmationStatus(message, false, Constants.CODE_NO_ROUTE_FOUND));
101+
} else {
102+
for (String stream : streams) {
103+
Producer producer =
104+
producers.computeIfAbsent(
105+
stream,
106+
stream1 -> {
107+
Producer p =
108+
producerBuilder.duplicate().superStream(null).stream(stream1).build();
109+
return p;
110+
});
111+
producer.send(message, confirmationHandler);
112+
}
109113
}
114+
} else {
115+
confirmationHandler.handle(
116+
new ConfirmationStatus(message, false, Constants.CODE_PRODUCER_CLOSED));
110117
}
111118
}
112119

120+
private boolean canSend() {
121+
return !this.closed.get();
122+
}
123+
113124
@Override
114125
public void close() {
115-
for (Entry<String, Producer> entry : producers.entrySet()) {
116-
try {
117-
entry.getValue().close();
118-
} catch (Exception e) {
119-
LOGGER.info(
120-
"Error while closing producer for partition {} of super stream {}: {}",
121-
entry.getKey(),
122-
this.superStream,
123-
e.getMessage());
126+
if (this.closed.compareAndSet(false, true)) {
127+
for (Entry<String, Producer> entry : producers.entrySet()) {
128+
try {
129+
entry.getValue().close();
130+
} catch (Exception e) {
131+
LOGGER.info(
132+
"Error while closing producer for partition {} of super stream {}: {}",
133+
entry.getKey(),
134+
this.superStream,
135+
e.getMessage());
136+
}
124137
}
125138
}
126139
}

src/test/java/com/rabbitmq/stream/impl/SuperStreamProducerTest.java

+40
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.rabbitmq.stream.Producer;
3131
import io.netty.channel.EventLoopGroup;
3232
import java.util.Map;
33+
import java.util.Set;
3334
import java.util.UUID;
3435
import java.util.concurrent.ConcurrentHashMap;
3536
import java.util.concurrent.CountDownLatch;
@@ -290,4 +291,43 @@ void getLastPublishingIdShouldReturnLowestValue() throws Exception {
290291
assertThat(counts.values().stream().map(AtomicLong::get).reduce(0L, Long::sum))
291292
.isEqualTo(messageCount);
292293
}
294+
295+
@Test
296+
void producerShouldNotPublishMessagesOnceClosed() throws Exception {
297+
int messageCount = 100;
298+
declareSuperStreamTopology(connection, superStream, partitions);
299+
String producerName = "super-stream-application";
300+
Producer producer =
301+
environment
302+
.producerBuilder()
303+
.name(producerName)
304+
.superStream(superStream)
305+
.routing(message -> message.getProperties().getMessageIdAsString())
306+
.producerBuilder()
307+
.build();
308+
309+
producer.close();
310+
311+
Set<Short> confirmationCodes = ConcurrentHashMap.newKeySet(1);
312+
CountDownLatch publishLatch = new CountDownLatch(messageCount);
313+
314+
IntStream.range(0, messageCount)
315+
.forEach(
316+
i ->
317+
producer.send(
318+
producer
319+
.messageBuilder()
320+
.publishingId(i)
321+
.properties()
322+
.messageId(UUID.randomUUID().toString())
323+
.messageBuilder()
324+
.build(),
325+
confirmationStatus -> {
326+
confirmationCodes.add(confirmationStatus.getCode());
327+
publishLatch.countDown();
328+
}));
329+
330+
assertThat(latchAssert(publishLatch)).completes(5);
331+
assertThat(confirmationCodes).hasSize(1).containsExactly(Constants.CODE_PRODUCER_CLOSED);
332+
}
293333
}

0 commit comments

Comments
 (0)