Skip to content

Commit c854e35

Browse files
committed
Merge branch '5.3.x'
2 parents a085a1b + 4e97776 commit c854e35

File tree

9 files changed

+134
-82
lines changed

9 files changed

+134
-82
lines changed

spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/DefaultStompSession.java

Lines changed: 49 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import java.lang.reflect.Type;
2020
import java.time.Instant;
2121
import java.util.ArrayList;
22-
import java.util.Collections;
2322
import java.util.List;
2423
import java.util.Map;
2524
import java.util.concurrent.CompletableFuture;
@@ -28,6 +27,7 @@
2827
import java.util.concurrent.ScheduledFuture;
2928
import java.util.concurrent.TimeUnit;
3029
import java.util.concurrent.atomic.AtomicInteger;
30+
import java.util.function.Consumer;
3131

3232
import org.apache.commons.logging.Log;
3333

@@ -439,7 +439,7 @@ else if (logger.isDebugEnabled()) {
439439
String receiptId = headers.getReceiptId();
440440
ReceiptHandler handler = this.receiptHandlers.get(receiptId);
441441
if (handler != null) {
442-
handler.handleReceiptReceived();
442+
handler.handleReceiptReceived(headers);
443443
}
444444
else if (logger.isDebugEnabled()) {
445445
logger.debug("No matching receipt: " + accessor.getDetailedLogMessage(message.getPayload()));
@@ -544,7 +544,7 @@ private class ReceiptHandler implements Receiptable {
544544
@Nullable
545545
private final String receiptId;
546546

547-
private final List<Runnable> receiptCallbacks = new ArrayList<>(2);
547+
private final List<Consumer<StompHeaders>> receiptCallbacks = new ArrayList<>(2);
548548

549549
private final List<Runnable> receiptLostCallbacks = new ArrayList<>(2);
550550

@@ -554,6 +554,9 @@ private class ReceiptHandler implements Receiptable {
554554
@Nullable
555555
private Boolean result;
556556

557+
@Nullable
558+
private StompHeaders receiptHeaders;
559+
557560
public ReceiptHandler(@Nullable String receiptId) {
558561
this.receiptId = receiptId;
559562
if (receiptId != null) {
@@ -576,64 +579,80 @@ public String getReceiptId() {
576579

577580
@Override
578581
public void addReceiptTask(Runnable task) {
579-
addTask(task, true);
582+
addReceiptTask(headers -> task.run());
580583
}
581584

582585
@Override
583-
public void addReceiptLostTask(Runnable task) {
584-
addTask(task, false);
585-
}
586-
587-
private void addTask(Runnable task, boolean successTask) {
588-
Assert.notNull(this.receiptId,
589-
"To track receipts, set autoReceiptEnabled=true or add 'receiptId' header");
586+
public void addReceiptTask(Consumer<StompHeaders> task) {
587+
Assert.notNull(this.receiptId, "Set autoReceiptEnabled to track receipts or add a 'receiptId' header");
590588
synchronized (this) {
591-
if (this.result != null && this.result == successTask) {
592-
invoke(Collections.singletonList(task));
589+
if (this.result != null) {
590+
if (this.result) {
591+
task.accept(this.receiptHeaders);
592+
}
593593
}
594594
else {
595-
if (successTask) {
596-
this.receiptCallbacks.add(task);
597-
}
598-
else {
599-
this.receiptLostCallbacks.add(task);
600-
}
595+
this.receiptCallbacks.add(task);
601596
}
602597
}
603598
}
604599

605-
private void invoke(List<Runnable> callbacks) {
606-
for (Runnable runnable : callbacks) {
607-
try {
608-
runnable.run();
600+
@Override
601+
public void addReceiptLostTask(Runnable task) {
602+
synchronized (this) {
603+
if (this.result != null) {
604+
if (!this.result) {
605+
task.run();
606+
}
609607
}
610-
catch (Throwable ex) {
611-
// ignore
608+
else {
609+
this.receiptLostCallbacks.add(task);
612610
}
613611
}
614612
}
615613

616-
public void handleReceiptReceived() {
617-
handleInternal(true);
614+
public void handleReceiptReceived(StompHeaders receiptHeaders) {
615+
handleInternal(true, receiptHeaders);
618616
}
619617

620618
public void handleReceiptNotReceived() {
621-
handleInternal(false);
619+
handleInternal(false, null);
622620
}
623621

624-
private void handleInternal(boolean result) {
622+
private void handleInternal(boolean result, @Nullable StompHeaders receiptHeaders) {
625623
synchronized (this) {
626624
if (this.result != null) {
627625
return;
628626
}
629627
this.result = result;
630-
invoke(result ? this.receiptCallbacks : this.receiptLostCallbacks);
628+
this.receiptHeaders = receiptHeaders;
629+
if (result) {
630+
this.receiptCallbacks.forEach(consumer -> {
631+
try {
632+
consumer.accept(this.receiptHeaders);
633+
}
634+
catch (Throwable ex) {
635+
// ignore
636+
}
637+
});
638+
}
639+
else {
640+
this.receiptLostCallbacks.forEach(task -> {
641+
try {
642+
task.run();
643+
}
644+
catch (Throwable ex) {
645+
// ignore
646+
}
647+
});
648+
}
631649
DefaultStompSession.this.receiptHandlers.remove(this.receiptId);
632650
if (this.future != null) {
633651
this.future.cancel(true);
634652
}
635653
}
636654
}
655+
637656
}
638657

639658

spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompSession.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.messaging.simp.stomp;
1818

19+
import java.util.function.Consumer;
20+
1921
import org.springframework.lang.Nullable;
2022

2123
/**
@@ -139,16 +141,27 @@ interface Receiptable {
139141

140142
/**
141143
* Task to invoke when a receipt is received.
144+
* @param task the task to invoke
145+
* @throws java.lang.IllegalArgumentException if the receiptId is {@code null}
146+
*/
147+
void addReceiptTask(Runnable task);
148+
149+
/**
150+
* Variant of {@link #addReceiptTask(Runnable)} with a {@link Consumer}
151+
* of the headers from the {@code RECEIPT} frame.
152+
* @param task the consumer to invoke
142153
* @throws java.lang.IllegalArgumentException if the receiptId is {@code null}
154+
* @since 5.3.23
143155
*/
144-
void addReceiptTask(Runnable runnable);
156+
void addReceiptTask(Consumer<StompHeaders> task);
145157

146158
/**
147159
* Task to invoke when a receipt is not received in the configured time.
160+
* @param task the task to invoke
148161
* @throws java.lang.IllegalArgumentException if the receiptId is {@code null}
149162
* @see org.springframework.messaging.simp.stomp.StompClientSupport#setReceiptTimeLimit(long)
150163
*/
151-
void addReceiptLostTask(Runnable runnable);
164+
void addReceiptLostTask(Runnable task);
152165
}
153166

154167

spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/DefaultStompSessionTests.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -575,22 +575,30 @@ public void receiptReceived() {
575575
this.session.setTaskScheduler(mock(TaskScheduler.class));
576576

577577
AtomicReference<Boolean> received = new AtomicReference<>();
578+
AtomicReference<StompHeaders> receivedHeaders = new AtomicReference<>();
578579

579580
StompHeaders headers = new StompHeaders();
580581
headers.setDestination("/topic/foo");
581582
headers.setReceipt("my-receipt");
582583
Subscription subscription = this.session.subscribe(headers, mock(StompFrameHandler.class));
583-
subscription.addReceiptTask(() -> received.set(true));
584+
subscription.addReceiptTask(receiptHeaders -> {
585+
received.set(true);
586+
receivedHeaders.set(receiptHeaders);
587+
});
584588

585589
assertThat((Object) received.get()).isNull();
586590

587591
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.RECEIPT);
588592
accessor.setReceiptId("my-receipt");
593+
accessor.setNativeHeader("foo", "bar");
589594
accessor.setLeaveMutable(true);
590595
this.session.handleMessage(MessageBuilder.createMessage(new byte[0], accessor.getMessageHeaders()));
591596

592597
assertThat(received.get()).isNotNull();
593598
assertThat(received.get()).isTrue();
599+
assertThat(receivedHeaders.get()).isNotNull();
600+
assertThat(receivedHeaders.get().get("foo").size()).isEqualTo(1);
601+
assertThat(receivedHeaders.get().get("foo").get(0)).isEqualTo("bar");
594602
}
595603

596604
@Test
@@ -599,6 +607,7 @@ public void receiptReceivedBeforeTaskAdded() {
599607
this.session.setTaskScheduler(mock(TaskScheduler.class));
600608

601609
AtomicReference<Boolean> received = new AtomicReference<>();
610+
AtomicReference<StompHeaders> receivedHeaders = new AtomicReference<>();
602611

603612
StompHeaders headers = new StompHeaders();
604613
headers.setDestination("/topic/foo");
@@ -607,13 +616,20 @@ public void receiptReceivedBeforeTaskAdded() {
607616

608617
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.RECEIPT);
609618
accessor.setReceiptId("my-receipt");
619+
accessor.setNativeHeader("foo", "bar");
610620
accessor.setLeaveMutable(true);
611621
this.session.handleMessage(MessageBuilder.createMessage(new byte[0], accessor.getMessageHeaders()));
612622

613-
subscription.addReceiptTask(() -> received.set(true));
623+
subscription.addReceiptTask(receiptHeaders -> {
624+
received.set(true);
625+
receivedHeaders.set(receiptHeaders);
626+
});
614627

615628
assertThat(received.get()).isNotNull();
616629
assertThat(received.get()).isTrue();
630+
assertThat(receivedHeaders.get()).isNotNull();
631+
assertThat(receivedHeaders.get().get("foo").size()).isEqualTo(1);
632+
assertThat(receivedHeaders.get().get("foo").get(0)).isEqualTo("bar");
617633
}
618634

619635
@Test

spring-websocket/src/main/java/org/springframework/web/socket/client/ConnectionManagerSupport.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@
2525
import org.springframework.web.util.UriComponentsBuilder;
2626

2727
/**
28-
* A base class for WebSocket connection managers. Provides a declarative style of
29-
* connecting to a WebSocket server given a URI to connect to. The connection occurs when
30-
* the Spring ApplicationContext is refreshed, if the {@link #autoStartup} property is set
31-
* to {@code true}, or if set to {@code false}, the {@link #start()} and #stop methods can
32-
* be invoked manually.
28+
* Base class for a connection manager that automates the process of connecting
29+
* to a WebSocket server with the Spring ApplicationContext lifecycle. Connects
30+
* to a WebSocket server on {@link #start()} and disconnects on {@link #stop()}.
31+
* If {@link #setAutoStartup(boolean)} is set to {@code true} this will be done
32+
* automatically when the Spring {@code ApplicationContext} is refreshed.
3333
*
3434
* @author Rossen Stoyanchev
3535
* @since 4.0
@@ -163,11 +163,19 @@ public boolean isRunning() {
163163
return this.running;
164164
}
165165

166+
/**
167+
* Whether the connection is open/{@code true} or closed/{@code false}.
168+
*/
169+
public abstract boolean isConnected();
166170

171+
/**
172+
* Subclasses implement this to actually establish the connection.
173+
*/
167174
protected abstract void openConnection();
168175

176+
/**
177+
* Subclasses implement this to close the connection.
178+
*/
169179
protected abstract void closeConnection() throws Exception;
170180

171-
protected abstract boolean isConnected();
172-
173181
}

spring-websocket/src/main/java/org/springframework/web/socket/client/WebSocketConnectionManager.java

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,9 @@
2828
import org.springframework.web.socket.handler.LoggingWebSocketHandlerDecorator;
2929

3030
/**
31-
* A WebSocket connection manager that is given a URI, a {@link WebSocketClient}, and a
32-
* {@link WebSocketHandler}, connects to a WebSocket server through {@link #start()} and
33-
* {@link #stop()} methods. If {@link #setAutoStartup(boolean)} is set to {@code true}
34-
* this will be done automatically when the Spring ApplicationContext is refreshed.
31+
* WebSocket {@link ConnectionManagerSupport connection manager} that connects
32+
* to the server via {@link WebSocketClient} and handles the session with a
33+
* {@link WebSocketHandler}.
3534
*
3635
* @author Rossen Stoyanchev
3736
* @author Sam Brannen
@@ -58,14 +57,6 @@ public WebSocketConnectionManager(WebSocketClient client,
5857
}
5958

6059

61-
/**
62-
* Decorate the WebSocketHandler provided to the class constructor.
63-
* <p>By default {@link LoggingWebSocketHandlerDecorator} is added.
64-
*/
65-
protected WebSocketHandler decorateWebSocketHandler(WebSocketHandler handler) {
66-
return new LoggingWebSocketHandlerDecorator(handler);
67-
}
68-
6960
/**
7061
* Set the sub-protocols to use. If configured, specified sub-protocols will be
7162
* requested in the handshake through the {@code Sec-WebSocket-Protocol} header. The
@@ -130,6 +121,11 @@ public void stopInternal() throws Exception {
130121
super.stopInternal();
131122
}
132123

124+
@Override
125+
public boolean isConnected() {
126+
return (this.webSocketSession != null && this.webSocketSession.isOpen());
127+
}
128+
133129
@Override
134130
protected void openConnection() {
135131
if (logger.isInfoEnabled()) {
@@ -157,9 +153,12 @@ protected void closeConnection() throws Exception {
157153
}
158154
}
159155

160-
@Override
161-
protected boolean isConnected() {
162-
return (this.webSocketSession != null && this.webSocketSession.isOpen());
156+
/**
157+
* Decorate the WebSocketHandler provided to the class constructor.
158+
* <p>By default {@link LoggingWebSocketHandlerDecorator} is added.
159+
*/
160+
protected WebSocketHandler decorateWebSocketHandler(WebSocketHandler handler) {
161+
return new LoggingWebSocketHandlerDecorator(handler);
163162
}
164163

165164
}

0 commit comments

Comments
 (0)