Skip to content

Commit 7dee98b

Browse files
ColoredCarrotartembilan
authored andcommitted
GH-8678: Add BufferOverflowStrategy for WebScoket
Within Spring Integration's WebSocket support, a `ConcurrentWebSocketSessionDecorator`, which buffers outbound messages if sending is slow, is used to decorate all websocket sessions in `IntegrationWebSocketContainer`, the standard entrypoint for using websockets with Integration. * Expose a `ConcurrentWebSocketSessionDecorator.OverflowStrategy` option on the `IntegrationWebSocketContainer` **Cherry-pick to `5.5.x`, `6.0.x` & `6.1.x`** (cherry picked from commit be53593)
1 parent c267bd5 commit 7dee98b

File tree

2 files changed

+81
-4
lines changed

2 files changed

+81
-4
lines changed

spring-integration-websocket/src/main/java/org/springframework/integration/websocket/IntegrationWebSocketContainer.java

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.commons.logging.LogFactory;
2727

2828
import org.springframework.beans.factory.DisposableBean;
29+
import org.springframework.lang.Nullable;
2930
import org.springframework.util.Assert;
3031
import org.springframework.util.ReflectionUtils;
3132
import org.springframework.web.socket.CloseStatus;
@@ -53,6 +54,7 @@
5354
*
5455
* @author Artem Bilan
5556
* @author Gary Russell
57+
* @author Julian Koch
5658
*
5759
* @since 4.1
5860
*
@@ -79,6 +81,9 @@ public abstract class IntegrationWebSocketContainer implements DisposableBean {
7981

8082
private int sendBufferSizeLimit = DEFAULT_SEND_BUFFER_SIZE;
8183

84+
@Nullable
85+
private ConcurrentWebSocketSessionDecorator.OverflowStrategy sendBufferOverflowStrategy;
86+
8287
public void setSendTimeLimit(int sendTimeLimit) {
8388
this.sendTimeLimit = sendTimeLimit;
8489
}
@@ -87,6 +92,21 @@ public void setSendBufferSizeLimit(int sendBufferSizeLimit) {
8792
this.sendBufferSizeLimit = sendBufferSizeLimit;
8893
}
8994

95+
/**
96+
* Set the send buffer overflow strategy.
97+
* <p>Concurrently generated outbound messages are buffered if sending is slow.
98+
* This strategy determines the behavior when the buffer has reached the limit
99+
* configured with {@link #setSendBufferSizeLimit}.
100+
* @param overflowStrategy The {@link ConcurrentWebSocketSessionDecorator.OverflowStrategy} to use.
101+
* @since 5.5.19
102+
* @see ConcurrentWebSocketSessionDecorator
103+
*/
104+
public void setSendBufferOverflowStrategy(
105+
@Nullable ConcurrentWebSocketSessionDecorator.OverflowStrategy overflowStrategy) {
106+
107+
this.sendBufferOverflowStrategy = overflowStrategy;
108+
}
109+
90110
public void setMessageListener(WebSocketListener messageListener) {
91111
Assert.state(this.messageListener == null || this.messageListener.equals(messageListener),
92112
"'messageListener' is already configured");
@@ -162,6 +182,17 @@ public void destroy() {
162182
}
163183
}
164184

185+
private WebSocketSession decorateSession(WebSocketSession sessionToDecorate) {
186+
if (this.sendBufferOverflowStrategy == null) {
187+
return new ConcurrentWebSocketSessionDecorator(sessionToDecorate, this.sendTimeLimit,
188+
this.sendBufferSizeLimit);
189+
}
190+
else {
191+
return new ConcurrentWebSocketSessionDecorator(sessionToDecorate, this.sendTimeLimit,
192+
this.sendBufferSizeLimit, this.sendBufferOverflowStrategy);
193+
}
194+
}
195+
165196
/**
166197
* An internal {@link WebSocketHandler} implementation to be used with native
167198
* Web-Socket containers.
@@ -183,10 +214,7 @@ public List<String> getSubProtocols() {
183214
public void afterConnectionEstablished(WebSocketSession sessionToDecorate)
184215
throws Exception { // NOSONAR
185216

186-
WebSocketSession session =
187-
new ConcurrentWebSocketSessionDecorator(sessionToDecorate,
188-
IntegrationWebSocketContainer.this.sendTimeLimit,
189-
IntegrationWebSocketContainer.this.sendBufferSizeLimit);
217+
WebSocketSession session = decorateSession(sessionToDecorate);
190218

191219
IntegrationWebSocketContainer.this.sessions.put(session.getId(), session);
192220
if (IntegrationWebSocketContainer.this.logger.isDebugEnabled()) {

spring-integration-websocket/src/test/java/org/springframework/integration/websocket/ClientWebSocketContainerTests.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.junit.jupiter.api.Test;
3535

3636
import org.springframework.http.HttpHeaders;
37+
import org.springframework.integration.test.util.TestUtils;
3738
import org.springframework.web.socket.CloseStatus;
3839
import org.springframework.web.socket.PingMessage;
3940
import org.springframework.web.socket.PongMessage;
@@ -42,12 +43,14 @@
4243
import org.springframework.web.socket.WebSocketMessage;
4344
import org.springframework.web.socket.WebSocketSession;
4445
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
46+
import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator;
4547

4648
import static org.assertj.core.api.Assertions.assertThat;
4749
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
4850

4951
/**
5052
* @author Artem Bilan
53+
* @author Julian Koch
5154
*
5255
* @since 4.1
5356
*/
@@ -138,16 +141,53 @@ protected CompletableFuture<WebSocketSession> executeInternal(WebSocketHandler w
138141
assertThat(session.isOpen()).isTrue();
139142
}
140143

144+
@Test
145+
public void testWebSocketContainerOverflowStrategyPropagation() throws Exception {
146+
StandardWebSocketClient webSocketClient = new StandardWebSocketClient();
147+
148+
Map<String, Object> userProperties = new HashMap<>();
149+
userProperties.put(Constants.IO_TIMEOUT_MS_PROPERTY, "" + (Constants.IO_TIMEOUT_MS_DEFAULT * 6));
150+
webSocketClient.setUserProperties(userProperties);
151+
152+
ClientWebSocketContainer container =
153+
new ClientWebSocketContainer(webSocketClient, new URI(server.getWsBaseUrl() + "/ws/websocket"));
154+
155+
container.setSendTimeLimit(10_000);
156+
container.setSendBufferSizeLimit(12345);
157+
container.setSendBufferOverflowStrategy(ConcurrentWebSocketSessionDecorator.OverflowStrategy.DROP);
158+
159+
TestWebSocketListener messageListener = new TestWebSocketListener();
160+
container.setMessageListener(messageListener);
161+
container.setConnectionTimeout(30);
162+
163+
container.start();
164+
165+
assertThat(messageListener.sessionStartedLatch.await(10, TimeUnit.SECONDS)).isTrue();
166+
167+
assertThat(messageListener.sendTimeLimit).isEqualTo(10_000);
168+
assertThat(messageListener.sendBufferSizeLimit).isEqualTo(12345);
169+
assertThat(messageListener.sendBufferOverflowStrategy)
170+
.isEqualTo(ConcurrentWebSocketSessionDecorator.OverflowStrategy.DROP);
171+
}
172+
141173
private static class TestWebSocketListener implements WebSocketListener {
142174

143175
public final CountDownLatch messageLatch = new CountDownLatch(1);
144176

177+
public final CountDownLatch sessionStartedLatch = new CountDownLatch(1);
178+
145179
public final CountDownLatch sessionEndedLatch = new CountDownLatch(1);
146180

147181
public WebSocketMessage<?> message;
148182

149183
public boolean started;
150184

185+
int sendTimeLimit;
186+
187+
int sendBufferSizeLimit;
188+
189+
ConcurrentWebSocketSessionDecorator.OverflowStrategy sendBufferOverflowStrategy;
190+
151191
TestWebSocketListener() {
152192
}
153193

@@ -160,6 +200,15 @@ public void onMessage(WebSocketSession session, WebSocketMessage<?> message) {
160200
@Override
161201
public void afterSessionStarted(WebSocketSession session) {
162202
this.started = true;
203+
204+
var sessionDecorator = (ConcurrentWebSocketSessionDecorator) session;
205+
this.sendTimeLimit = sessionDecorator.getSendTimeLimit();
206+
this.sendBufferSizeLimit = sessionDecorator.getBufferSizeLimit();
207+
this.sendBufferOverflowStrategy =
208+
TestUtils.getPropertyValue(sessionDecorator, "overflowStrategy",
209+
ConcurrentWebSocketSessionDecorator.OverflowStrategy.class);
210+
211+
this.sessionStartedLatch.countDown();
163212
}
164213

165214
@Override

0 commit comments

Comments
 (0)