Skip to content

Commit 4195e69

Browse files
committed
OrderedMessageChannelDecorator supports multiple subscribers
See gh-21798
1 parent d62d7f5 commit 4195e69

File tree

2 files changed

+63
-28
lines changed

2 files changed

+63
-28
lines changed

spring-messaging/src/main/java/org/springframework/messaging/simp/broker/OrderedMessageChannelDecorator.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.Queue;
2020
import java.util.concurrent.ConcurrentLinkedQueue;
2121
import java.util.concurrent.atomic.AtomicBoolean;
22+
import java.util.concurrent.atomic.AtomicInteger;
2223

2324
import org.apache.commons.logging.Log;
2425

@@ -48,6 +49,8 @@ public class OrderedMessageChannelDecorator implements MessageChannel {
4849

4950
private final MessageChannel channel;
5051

52+
private final int subscriberCount;
53+
5154
private final Log logger;
5255

5356
private final Queue<Message<?>> messages = new ConcurrentLinkedQueue<>();
@@ -57,6 +60,7 @@ public class OrderedMessageChannelDecorator implements MessageChannel {
5760

5861
public OrderedMessageChannelDecorator(MessageChannel channel, Log logger) {
5962
this.channel = channel;
63+
this.subscriberCount = (channel instanceof ExecutorSubscribableChannel ch ? ch.getSubscribers().size() : 0);
6064
this.logger = logger;
6165
}
6266

@@ -162,24 +166,30 @@ public static Runnable getNextMessageTask(Message<?> message) {
162166
/**
163167
* Remove handled message from queue, and send next message.
164168
*/
165-
private class PostHandleTask implements Runnable {
169+
private final class PostHandleTask implements Runnable {
166170

167171
private final Message<?> message;
168172

173+
@Nullable
174+
private final AtomicInteger handledCount;
175+
169176
private PostHandleTask(Message<?> message) {
170177
this.message = message;
178+
this.handledCount = (subscriberCount > 1 ? new AtomicInteger(0) : null);
171179
}
172180

173181
@Override
174182
public void run() {
175-
if (OrderedMessageChannelDecorator.this.removeMessage(message)) {
176-
sendNextMessage();
183+
if (this.handledCount == null || this.handledCount.addAndGet(1) == subscriberCount) {
184+
if (OrderedMessageChannelDecorator.this.removeMessage(message)) {
185+
sendNextMessage();
186+
}
177187
}
178188
}
179189
}
180190

181191

182-
private static class CallbackTaskInterceptor implements ExecutorChannelInterceptor {
192+
private final static class CallbackTaskInterceptor implements ExecutorChannelInterceptor {
183193

184194
@Override
185195
public void afterMessageHandled(

spring-messaging/src/test/java/org/springframework/messaging/simp/broker/OrderedMessageChannelDecoratorTests.java

Lines changed: 49 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -27,6 +27,9 @@
2727
import org.junit.jupiter.api.BeforeEach;
2828
import org.junit.jupiter.api.Test;
2929

30+
import org.springframework.messaging.Message;
31+
import org.springframework.messaging.MessageHandler;
32+
import org.springframework.messaging.MessagingException;
3033
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
3134
import org.springframework.messaging.simp.SimpMessageType;
3235
import org.springframework.messaging.support.ExecutorSubscribableChannel;
@@ -45,10 +48,6 @@ public class OrderedMessageChannelDecoratorTests {
4548
private static final Log logger = LogFactory.getLog(OrderedMessageChannelDecoratorTests.class);
4649

4750

48-
private OrderedMessageChannelDecorator sender;
49-
50-
ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(this.executor);
51-
5251
private ThreadPoolTaskExecutor executor;
5352

5453

@@ -58,12 +57,6 @@ public void setup() {
5857
this.executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 2);
5958
this.executor.setAllowCoreThreadTimeOut(true);
6059
this.executor.afterPropertiesSet();
61-
62-
this.channel = new ExecutorSubscribableChannel(this.executor);
63-
OrderedMessageChannelDecorator.configureInterceptor(this.channel, true);
64-
65-
this.sender = new OrderedMessageChannelDecorator(this.channel, logger);
66-
6760
}
6861

6962
@AfterEach
@@ -78,11 +71,48 @@ public void test() throws InterruptedException {
7871
int start = 1;
7972
int end = 1000;
8073

81-
AtomicInteger index = new AtomicInteger(start);
82-
AtomicReference<Object> result = new AtomicReference<>();
83-
CountDownLatch latch = new CountDownLatch(1);
74+
ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(this.executor);
75+
OrderedMessageChannelDecorator.configureInterceptor(channel, true);
76+
77+
TestHandler handler1 = new TestHandler(start, end);
78+
TestHandler handler2 = new TestHandler(start, end);
79+
TestHandler handler3 = new TestHandler(start, end);
80+
81+
channel.subscribe(handler1);
82+
channel.subscribe(handler2);
83+
channel.subscribe(handler3);
84+
85+
OrderedMessageChannelDecorator sender = new OrderedMessageChannelDecorator(channel, logger);
86+
for (int i = start; i <= end; i++) {
87+
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
88+
accessor.setHeader("seq", i);
89+
accessor.setLeaveMutable(true);
90+
sender.send(MessageBuilder.createMessage("payload", accessor.getMessageHeaders()));
91+
}
92+
93+
handler1.verify();
94+
handler2.verify();
95+
handler3.verify();
96+
}
8497

85-
this.channel.subscribe(message -> {
98+
99+
private static class TestHandler implements MessageHandler {
100+
101+
private final AtomicInteger index;
102+
103+
private final int end;
104+
105+
private final AtomicReference<Object> result = new AtomicReference<>();
106+
107+
private final CountDownLatch latch = new CountDownLatch(1);
108+
109+
TestHandler(int start, int end) {
110+
this.index = new AtomicInteger(start);
111+
this.end = end;
112+
}
113+
114+
@Override
115+
public void handleMessage(Message<?> message) throws MessagingException {
86116
int expected = index.getAndIncrement();
87117
Integer actual = (Integer) message.getHeaders().getOrDefault("seq", -1);
88118
if (actual != expected) {
@@ -104,17 +134,12 @@ public void test() throws InterruptedException {
104134
result.set("Done");
105135
latch.countDown();
106136
}
107-
});
108-
109-
for (int i = start; i <= end; i++) {
110-
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
111-
accessor.setHeader("seq", i);
112-
accessor.setLeaveMutable(true);
113-
this.sender.send(MessageBuilder.createMessage("payload", accessor.getMessageHeaders()));
114137
}
115138

116-
latch.await(10, TimeUnit.SECONDS);
117-
assertThat(result.get()).isEqualTo("Done");
139+
void verify() throws InterruptedException {
140+
assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue();
141+
assertThat(result.get()).isEqualTo("Done");
142+
}
118143
}
119144

120145
}

0 commit comments

Comments
 (0)