Skip to content

Commit 8b4f60b

Browse files
committed
Support receipt on DISCONNECT with simple broker
Issue: SPR-14568
1 parent 3b95e0b commit 8b4f60b

File tree

5 files changed

+80
-10
lines changed

5 files changed

+80
-10
lines changed

spring-messaging/src/main/java/org/springframework/messaging/simp/SimpMessageHeaderAccessor.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ public class SimpMessageHeaderAccessor extends NativeMessageHeaderAccessor {
6565

6666
public static final String CONNECT_MESSAGE_HEADER = "simpConnectMessage";
6767

68+
public static final String DISCONNECT_MESSAGE_HEADER = "simpDisconnectMessage";
69+
6870
public static final String HEART_BEAT_HEADER = "simpHeartbeat";
6971

7072

spring-messaging/src/main/java/org/springframework/messaging/simp/broker/SimpleBrokerMessageHandler.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ else if (SimpMessageType.CONNECT.equals(messageType)) {
275275
}
276276
else if (SimpMessageType.DISCONNECT.equals(messageType)) {
277277
logMessage(message);
278-
handleDisconnect(sessionId, user);
278+
handleDisconnect(sessionId, user, message);
279279
}
280280
else if (SimpMessageType.SUBSCRIBE.equals(messageType)) {
281281
logMessage(message);
@@ -310,12 +310,15 @@ private void initHeaders(SimpMessageHeaderAccessor accessor) {
310310
}
311311
}
312312

313-
private void handleDisconnect(String sessionId, Principal user) {
313+
private void handleDisconnect(String sessionId, Principal user, Message<?> origMessage) {
314314
this.sessions.remove(sessionId);
315315
this.subscriptionRegistry.unregisterAllSubscriptions(sessionId);
316316
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create(SimpMessageType.DISCONNECT_ACK);
317317
accessor.setSessionId(sessionId);
318318
accessor.setUser(user);
319+
if (origMessage != null) {
320+
accessor.setHeader(SimpMessageHeaderAccessor.DISCONNECT_MESSAGE_HEADER, origMessage);
321+
}
319322
initHeaders(accessor);
320323
Message<byte[]> message = MessageBuilder.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders());
321324
getClientOutboundChannel().send(message);
@@ -432,7 +435,7 @@ public void run() {
432435
long now = System.currentTimeMillis();
433436
for (SessionInfo info : sessions.values()) {
434437
if (info.getReadInterval() > 0 && (now - info.getLastReadTime()) > info.getReadInterval()) {
435-
handleDisconnect(info.getSessiondId(), info.getUser());
438+
handleDisconnect(info.getSessiondId(), info.getUser(), null);
436439
}
437440
if (info.getWriteInterval() > 0 && (now - info.getLastWriteTime()) > info.getWriteInterval()) {
438441
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create(SimpMessageType.HEARTBEAT);

spring-messaging/src/test/java/org/springframework/messaging/simp/broker/SimpleBrokerMessageHandlerTests.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2015 the original author or authors.
2+
* Copyright 2002-2016 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,9 +16,6 @@
1616

1717
package org.springframework.messaging.simp.broker;
1818

19-
import static org.junit.Assert.*;
20-
import static org.mockito.Mockito.*;
21-
2219
import java.security.Principal;
2320
import java.util.Collections;
2421
import java.util.List;
@@ -41,6 +38,21 @@
4138
import org.springframework.messaging.support.MessageBuilder;
4239
import org.springframework.scheduling.TaskScheduler;
4340

41+
import static org.junit.Assert.assertArrayEquals;
42+
import static org.junit.Assert.assertEquals;
43+
import static org.junit.Assert.assertNotNull;
44+
import static org.junit.Assert.assertNull;
45+
import static org.junit.Assert.assertSame;
46+
import static org.junit.Assert.assertTrue;
47+
import static org.mockito.Mockito.any;
48+
import static org.mockito.Mockito.atLeast;
49+
import static org.mockito.Mockito.eq;
50+
import static org.mockito.Mockito.mock;
51+
import static org.mockito.Mockito.times;
52+
import static org.mockito.Mockito.verify;
53+
import static org.mockito.Mockito.verifyNoMoreInteractions;
54+
import static org.mockito.Mockito.when;
55+
4456
/**
4557
* Unit tests for SimpleBrokerMessageHandler.
4658
*
@@ -72,7 +84,7 @@ public class SimpleBrokerMessageHandlerTests {
7284
public void setup() {
7385
MockitoAnnotations.initMocks(this);
7486
this.messageHandler = new SimpleBrokerMessageHandler(this.clientInboundChannel,
75-
this.clientOutboundChannel, this.brokerChannel, Collections.<String>emptyList());
87+
this.clientOutboundChannel, this.brokerChannel, Collections.emptyList());
7688
}
7789

7890

@@ -130,6 +142,7 @@ public void subcribeDisconnectPublish() {
130142

131143
Message<?> captured = this.messageCaptor.getAllValues().get(0);
132144
assertEquals(SimpMessageType.DISCONNECT_ACK, SimpMessageHeaderAccessor.getMessageType(captured.getHeaders()));
145+
assertSame(message, captured.getHeaders().get(SimpMessageHeaderAccessor.DISCONNECT_MESSAGE_HEADER));
133146
assertEquals(sess1, SimpMessageHeaderAccessor.getSessionId(captured.getHeaders()));
134147
assertEquals("joe", SimpMessageHeaderAccessor.getUser(captured.getHeaders()).getName());
135148

spring-websocket/src/main/java/org/springframework/web/socket/messaging/StompSubProtocolHandler.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.springframework.context.ApplicationEventPublisherAware;
3535
import org.springframework.messaging.Message;
3636
import org.springframework.messaging.MessageChannel;
37+
import org.springframework.messaging.MessageHeaders;
3738
import org.springframework.messaging.simp.SimpAttributes;
3839
import org.springframework.messaging.simp.SimpAttributesContextHolder;
3940
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
@@ -449,8 +450,15 @@ else if (accessor instanceof SimpMessageHeaderAccessor) {
449450
stompAccessor = convertConnectAcktoStompConnected(stompAccessor);
450451
}
451452
else if (SimpMessageType.DISCONNECT_ACK.equals(messageType)) {
452-
stompAccessor = StompHeaderAccessor.create(StompCommand.ERROR);
453-
stompAccessor.setMessage("Session closed.");
453+
String receipt = getDisconnectReceipt(stompAccessor);
454+
if (receipt != null) {
455+
stompAccessor = StompHeaderAccessor.create(StompCommand.RECEIPT);
456+
stompAccessor.setReceiptId(receipt);
457+
}
458+
else {
459+
stompAccessor = StompHeaderAccessor.create(StompCommand.ERROR);
460+
stompAccessor.setMessage("Session closed.");
461+
}
454462
}
455463
else if (SimpMessageType.HEARTBEAT.equals(messageType)) {
456464
stompAccessor = StompHeaderAccessor.createForHeartbeat();
@@ -503,6 +511,16 @@ else if (!acceptVersions.isEmpty()) {
503511
return connectedHeaders;
504512
}
505513

514+
private String getDisconnectReceipt(SimpMessageHeaderAccessor simpHeaders) {
515+
String name = StompHeaderAccessor.DISCONNECT_MESSAGE_HEADER;
516+
Message<?> message = (Message<?>) simpHeaders.getHeader(name);
517+
if (message != null) {
518+
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
519+
return accessor.getReceipt();
520+
}
521+
return null;
522+
}
523+
506524
protected StompHeaderAccessor toMutableAccessor(StompHeaderAccessor headerAccessor, Message<?> message) {
507525
return (headerAccessor.isMutable() ? headerAccessor : StompHeaderAccessor.wrap(message));
508526
}

spring-websocket/src/test/java/org/springframework/web/socket/messaging/StompSubProtocolHandlerTests.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,40 @@ public void handleMessageToClientWithSimpConnectAckDefaultHeartBeat() {
169169
"user-name:joe\n" + "\n" + "\u0000", actual.getPayload());
170170
}
171171

172+
@Test
173+
public void handleMessageToClientWithSimpDisconnectAck() {
174+
175+
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.DISCONNECT);
176+
Message<?> connectMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders());
177+
178+
SimpMessageHeaderAccessor ackAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.DISCONNECT_ACK);
179+
ackAccessor.setHeader(SimpMessageHeaderAccessor.DISCONNECT_MESSAGE_HEADER, connectMessage);
180+
Message<byte[]> ackMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, ackAccessor.getMessageHeaders());
181+
this.protocolHandler.handleMessageToClient(this.session, ackMessage);
182+
183+
assertEquals(1, this.session.getSentMessages().size());
184+
TextMessage actual = (TextMessage) this.session.getSentMessages().get(0);
185+
assertEquals("ERROR\n" + "message:Session closed.\n" + "content-length:0\n" +
186+
"\n\u0000", actual.getPayload());
187+
}
188+
189+
@Test
190+
public void handleMessageToClientWithSimpDisconnectAckAndReceipt() {
191+
192+
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.DISCONNECT);
193+
accessor.setReceipt("message-123");
194+
Message<?> connectMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders());
195+
196+
SimpMessageHeaderAccessor ackAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.DISCONNECT_ACK);
197+
ackAccessor.setHeader(SimpMessageHeaderAccessor.DISCONNECT_MESSAGE_HEADER, connectMessage);
198+
Message<byte[]> ackMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, ackAccessor.getMessageHeaders());
199+
this.protocolHandler.handleMessageToClient(this.session, ackMessage);
200+
201+
assertEquals(1, this.session.getSentMessages().size());
202+
TextMessage actual = (TextMessage) this.session.getSentMessages().get(0);
203+
assertEquals("RECEIPT\n" + "receipt-id:message-123\n" + "\n\u0000", actual.getPayload());
204+
}
205+
172206
@Test
173207
public void handleMessageToClientWithSimpHeartbeat() {
174208

0 commit comments

Comments
 (0)