Skip to content

Commit 5dd271a

Browse files
committed
spring-projectsGH-8678: Add test, remove explicit default
1 parent 0d8e89c commit 5dd271a

File tree

2 files changed

+75
-20
lines changed

2 files changed

+75
-20
lines changed

spring-integration-websocket/src/main/java/org/springframework/integration/websocket/IntegrationWebSocketContainer.java

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
import org.springframework.beans.factory.DisposableBean;
3131
import org.springframework.lang.NonNull;
32+
import org.springframework.lang.Nullable;
3233
import org.springframework.util.Assert;
3334
import org.springframework.util.ReflectionUtils;
3435
import org.springframework.web.socket.CloseStatus;
@@ -56,6 +57,7 @@
5657
*
5758
* @author Artem Bilan
5859
* @author Gary Russell
60+
* @author Julian Koch
5961
*
6062
* @since 4.1
6163
*
@@ -68,9 +70,6 @@ public abstract class IntegrationWebSocketContainer implements DisposableBean {
6870

6971
public static final int DEFAULT_SEND_BUFFER_SIZE = 512 * 1024;
7072

71-
public static final ConcurrentWebSocketSessionDecorator.OverflowStrategy DEFAULT_SEND_BUFFER_OVERFLOW_STRATEGY =
72-
ConcurrentWebSocketSessionDecorator.OverflowStrategy.TERMINATE;
73-
7473
protected final Log logger = LogFactory.getLog(getClass()); // NOSONAR
7574

7675
protected final Lock lock = new ReentrantLock();
@@ -87,9 +86,8 @@ public abstract class IntegrationWebSocketContainer implements DisposableBean {
8786

8887
private int sendBufferSizeLimit = DEFAULT_SEND_BUFFER_SIZE;
8988

90-
@NonNull
91-
private ConcurrentWebSocketSessionDecorator.OverflowStrategy sendBufferOverflowStrategy =
92-
DEFAULT_SEND_BUFFER_OVERFLOW_STRATEGY;
89+
@Nullable
90+
private ConcurrentWebSocketSessionDecorator.OverflowStrategy sendBufferOverflowStrategy;
9391

9492
public void setSendTimeLimit(int sendTimeLimit) {
9593
this.sendTimeLimit = sendTimeLimit;
@@ -101,19 +99,14 @@ public void setSendBufferSizeLimit(int sendBufferSizeLimit) {
10199

102100
/**
103101
* Set the send buffer overflow strategy.
104-
* <p>
105-
* Concurrently generated outbound messages are buffered if sending is slow.
102+
* <p>Concurrently generated outbound messages are buffered if sending is slow.
106103
* This strategy determines the behavior when the buffer has reached the limit
107104
* configured with {@link #setSendBufferSizeLimit}.
108-
* <p>
109-
* By default, the session associated with the culpable message is terminated.
110-
*
111-
* @param overflowStrategy The overflow strategy to use (see {@link ConcurrentWebSocketSessionDecorator.OverflowStrategy}).
112-
*
105+
* @param overflowStrategy The overflow strategy to use (see {@link ConcurrentWebSocketSessionDecorator.OverflowStrategy}),
106+
* or {@code null} to use the default as specified by {@link ConcurrentWebSocketSessionDecorator}.
113107
* @see ConcurrentWebSocketSessionDecorator
114108
*/
115-
public void setSendBufferOverflowStrategy(@NonNull ConcurrentWebSocketSessionDecorator.OverflowStrategy overflowStrategy) {
116-
Assert.notNull(overflowStrategy, "Overflow strategy must not be null");
109+
public void setSendBufferOverflowStrategy(@Nullable ConcurrentWebSocketSessionDecorator.OverflowStrategy overflowStrategy) {
117110
this.sendBufferOverflowStrategy = overflowStrategy;
118111
}
119112

@@ -213,11 +206,7 @@ public List<String> getSubProtocols() {
213206
public void afterConnectionEstablished(WebSocketSession sessionToDecorate)
214207
throws Exception { // NOSONAR
215208

216-
WebSocketSession session =
217-
new ConcurrentWebSocketSessionDecorator(sessionToDecorate,
218-
IntegrationWebSocketContainer.this.sendTimeLimit,
219-
IntegrationWebSocketContainer.this.sendBufferSizeLimit,
220-
IntegrationWebSocketContainer.this.sendBufferOverflowStrategy);
209+
WebSocketSession session = decorateSession(sessionToDecorate);
221210

222211
IntegrationWebSocketContainer.this.sessions.put(session.getId(), session);
223212
if (IntegrationWebSocketContainer.this.logger.isDebugEnabled()) {
@@ -267,6 +256,16 @@ public boolean supportsPartialMessages() {
267256
return false;
268257
}
269258

259+
private WebSocketSession decorateSession(@NonNull WebSocketSession sessionToDecorate) {
260+
return (IntegrationWebSocketContainer.this.sendBufferOverflowStrategy == null
261+
? new ConcurrentWebSocketSessionDecorator(sessionToDecorate,
262+
IntegrationWebSocketContainer.this.sendTimeLimit,
263+
IntegrationWebSocketContainer.this.sendBufferSizeLimit)
264+
: new ConcurrentWebSocketSessionDecorator(sessionToDecorate,
265+
IntegrationWebSocketContainer.this.sendTimeLimit,
266+
IntegrationWebSocketContainer.this.sendBufferSizeLimit,
267+
IntegrationWebSocketContainer.this.sendBufferOverflowStrategy));
268+
}
270269
}
271270

272271
}

spring-integration-websocket/src/test/java/org/springframework/integration/websocket/ClientWebSocketContainerTests.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import org.junit.jupiter.api.Test;
3535

3636
import org.springframework.http.HttpHeaders;
37+
import org.springframework.integration.test.util.TestUtils;
38+
import org.springframework.lang.Nullable;
3739
import org.springframework.web.socket.CloseStatus;
3840
import org.springframework.web.socket.PingMessage;
3941
import org.springframework.web.socket.PongMessage;
@@ -42,6 +44,7 @@
4244
import org.springframework.web.socket.WebSocketMessage;
4345
import org.springframework.web.socket.WebSocketSession;
4446
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
47+
import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator;
4548

4649
import static org.assertj.core.api.Assertions.assertThat;
4750
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
@@ -138,16 +141,60 @@ protected CompletableFuture<WebSocketSession> executeInternal(WebSocketHandler w
138141
assertThat(session.isOpen()).isTrue();
139142
}
140143

144+
@Test
145+
public void testWebSocketContainerOverflowStrategyPropagation() throws Exception {
146+
StandardWebSocketClient webSocketClient = new StandardWebSocketClient();
147+
148+
Map<String, Object> userProperties = new HashMap<>();
149+
userProperties.put(Constants.IO_TIMEOUT_MS_PROPERTY, "" + (Constants.IO_TIMEOUT_MS_DEFAULT * 6));
150+
webSocketClient.setUserProperties(userProperties);
151+
152+
ClientWebSocketContainer container =
153+
new ClientWebSocketContainer(webSocketClient, new URI(server.getWsBaseUrl() + "/ws/websocket"));
154+
155+
// We expect the options we set here to be propagated to the concrete ConcurrentWebSocketSessionDecorator
156+
container.setSendTimeLimit(10_000);
157+
container.setSendBufferSizeLimit(12345);
158+
container.setSendBufferOverflowStrategy(ConcurrentWebSocketSessionDecorator.OverflowStrategy.DROP);
159+
160+
TestWebSocketListener messageListener = new TestWebSocketListener();
161+
container.setMessageListener(messageListener);
162+
container.setConnectionTimeout(30);
163+
164+
container.start();
165+
166+
// We must wait at least until the session has been started before we can check the propagated options
167+
assertThat(messageListener.sessionStartedLatch.await(10, TimeUnit.SECONDS)).isTrue();
168+
169+
assertThat(messageListener.optionsPropagatedToSession).isNotNull();
170+
assertThat(messageListener.optionsPropagatedToSession.sendTimeLimit).isEqualTo(10_000);
171+
assertThat(messageListener.optionsPropagatedToSession.sendBufferSizeLimit).isEqualTo(12345);
172+
assertThat(messageListener.optionsPropagatedToSession.sendBufferOverflowStrategy)
173+
.isEqualTo(ConcurrentWebSocketSessionDecorator.OverflowStrategy.DROP);
174+
}
175+
176+
private record OptionsPropagatedToSession(
177+
int sendTimeLimit,
178+
int sendBufferSizeLimit,
179+
ConcurrentWebSocketSessionDecorator.OverflowStrategy sendBufferOverflowStrategy
180+
) {
181+
}
182+
141183
private static class TestWebSocketListener implements WebSocketListener {
142184

143185
public final CountDownLatch messageLatch = new CountDownLatch(1);
144186

187+
public final CountDownLatch sessionStartedLatch = new CountDownLatch(1);
188+
145189
public final CountDownLatch sessionEndedLatch = new CountDownLatch(1);
146190

147191
public WebSocketMessage<?> message;
148192

149193
public boolean started;
150194

195+
@Nullable
196+
public OptionsPropagatedToSession optionsPropagatedToSession;
197+
151198
TestWebSocketListener() {
152199
}
153200

@@ -160,6 +207,15 @@ public void onMessage(WebSocketSession session, WebSocketMessage<?> message) {
160207
@Override
161208
public void afterSessionStarted(WebSocketSession session) {
162209
this.started = true;
210+
211+
var sessionDecorator = (ConcurrentWebSocketSessionDecorator) session;
212+
this.optionsPropagatedToSession = new OptionsPropagatedToSession(
213+
sessionDecorator.getSendTimeLimit(),
214+
sessionDecorator.getBufferSizeLimit(),
215+
TestUtils.getPropertyValue(sessionDecorator, "overflowStrategy", ConcurrentWebSocketSessionDecorator.OverflowStrategy.class)
216+
);
217+
218+
this.sessionStartedLatch.countDown();
163219
}
164220

165221
@Override

0 commit comments

Comments
 (0)