16
16
17
17
package org .springframework .messaging .simp .stomp ;
18
18
19
+ import java .net .URI ;
19
20
import java .nio .charset .StandardCharsets ;
20
21
import java .util .ArrayList ;
21
22
import java .util .Arrays ;
26
27
import java .util .concurrent .TimeUnit ;
27
28
28
29
import org .apache .activemq .broker .BrokerService ;
30
+ import org .apache .activemq .broker .TransportConnector ;
29
31
import org .apache .commons .logging .Log ;
30
32
import org .apache .commons .logging .LogFactory ;
31
33
import org .junit .jupiter .api .AfterEach ;
@@ -70,15 +72,15 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
70
72
71
73
private TestEventPublisher eventPublisher ;
72
74
73
- private int port ;
75
+ // initial value of zero implies that a random ephemeral port should be used
76
+ private int port = 0 ;
74
77
75
78
76
79
@ BeforeEach
77
80
@ SuppressWarnings ("deprecation" )
78
81
public void setup (TestInfo testInfo ) throws Exception {
79
82
logger .debug ("Setting up before '" + testInfo .getTestMethod ().get ().getName () + "'" );
80
83
81
- this .port = org .springframework .util .SocketUtils .findAvailableTcpPort (61613 );
82
84
this .responseChannel = new ExecutorSubscribableChannel ();
83
85
this .responseHandler = new TestMessageHandler ();
84
86
this .responseChannel .subscribe (this .responseHandler );
@@ -88,14 +90,25 @@ public void setup(TestInfo testInfo) throws Exception {
88
90
}
89
91
90
92
private void startActiveMQBroker () throws Exception {
93
+ TransportConnector stompConnector = createStompConnector (this .port );
91
94
this .activeMQBroker = new BrokerService ();
92
- this .activeMQBroker .addConnector ("stomp://localhost:" + this . port );
95
+ this .activeMQBroker .addConnector (stompConnector );
93
96
this .activeMQBroker .setStartAsync (false );
94
97
this .activeMQBroker .setPersistent (false );
95
98
this .activeMQBroker .setUseJmx (false );
96
99
this .activeMQBroker .getSystemUsage ().getMemoryUsage ().setLimit (1024 * 1024 * 5 );
97
100
this .activeMQBroker .getSystemUsage ().getTempUsage ().setLimit (1024 * 1024 * 5 );
98
101
this .activeMQBroker .start ();
102
+
103
+ // Reuse existing ephemeral port on restart (i.e., the next time this method
104
+ // is invoked) since it will already be configured in the relay
105
+ this .port = (this .port != 0 ? this .port : stompConnector .getServer ().getSocketAddress ().getPort ());
106
+ }
107
+
108
+ private TransportConnector createStompConnector (int port ) throws Exception {
109
+ TransportConnector connector = new TransportConnector ();
110
+ connector .setUri (new URI ("stomp://localhost:" + port ));
111
+ return connector ;
99
112
}
100
113
101
114
private void createAndStartRelay () throws InterruptedException {
0 commit comments