Skip to content

Commit 76b1d1d

Browse files
garyrussellartembilan
authored andcommitted
INT-4307: JMS: Endpoint Stop: Shutdown Container
JIRA: https://jira.spring.io/browse/INT-4307 Shut down the container (to close the connection) when a message-driven endpoint is stopped while the application continues to run.
1 parent 02c8e0a commit 76b1d1d

File tree

7 files changed

+191
-4
lines changed

7 files changed

+191
-4
lines changed

spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsInboundGateway.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -48,6 +48,18 @@ public void setRequestChannel(MessageChannel requestChannel) {
4848
this.endpoint.getListener().setRequestChannel(requestChannel);
4949
}
5050

51+
/**
52+
* Set to false to prevent listener container shutdown when the endpoint is stopped.
53+
* Then, if so configured, any cached consumer(s) in the container will remain.
54+
* Otherwise the shared connection and will be closed and the listener invokers shut
55+
* down; this behavior is new starting with version 5.1. Default: true.
56+
* @param shutdownContainerOnStop false to not shutdown.
57+
* @since 5.1
58+
*/
59+
public void setShutdownContainerOnStop(boolean shutdownContainerOnStop) {
60+
this.endpoint.setShutdownContainerOnStop(shutdownContainerOnStop);
61+
}
62+
5163
@Override
5264
public String getComponentType() {
5365
return this.endpoint.getComponentType();

spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsMessageDrivenEndpoint.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -44,7 +44,11 @@ public class JmsMessageDrivenEndpoint extends MessageProducerSupport implements
4444

4545
private final ChannelPublishingJmsMessageListener listener;
4646

47-
private volatile String sessionAcknowledgeMode;
47+
private String sessionAcknowledgeMode;
48+
49+
private boolean shutdownContainerOnStop = true;
50+
51+
private volatile boolean hasStopped;
4852

4953
/**
5054
* Construct an instance with an externally configured container.
@@ -128,6 +132,18 @@ public void setShouldTrack(boolean shouldTrack) {
128132
this.listener.setShouldTrack(shouldTrack);
129133
}
130134

135+
/**
136+
* Set to false to prevent listener container shutdown when the endpoint is stopped.
137+
* Then, if so configured, any cached consumer(s) in the container will remain.
138+
* Otherwise the shared connection and will be closed and the listener invokers shut
139+
* down; this behavior is new starting with version 5.1. Default: true.
140+
* @param shutdownContainerOnStop false to not shutdown.
141+
* @since 5.1
142+
*/
143+
public void setShutdownContainerOnStop(boolean shutdownContainerOnStop) {
144+
this.shutdownContainerOnStop = shutdownContainerOnStop;
145+
}
146+
131147
public ChannelPublishingJmsMessageListener getListener() {
132148
return this.listener;
133149
}
@@ -177,13 +193,21 @@ protected void onInit() {
177193
protected void doStart() {
178194
this.listener.start();
179195
if (!this.listenerContainer.isRunning()) {
196+
if (this.hasStopped) {
197+
this.listenerContainer.initialize();
198+
this.hasStopped = false;
199+
}
180200
this.listenerContainer.start();
181201
}
182202
}
183203

184204
@Override
185205
protected void doStop() {
186206
this.listenerContainer.stop();
207+
if (this.shutdownContainerOnStop) {
208+
this.hasStopped = true;
209+
this.listenerContainer.shutdown();
210+
}
187211
this.listener.stop();
188212
}
189213

spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/JmsInboundGatewaySpec.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
* @param <S> the target {@link JmsInboundGatewaySpec} implementation type.
3636
*
3737
* @author Artem Bilan
38+
* @author Gary Russell
3839
*
3940
* @since 5.0
4041
*/
@@ -176,6 +177,20 @@ public S extractReplyPayload(boolean extractReplyPayload) {
176177
return _this();
177178
}
178179

180+
/**
181+
* Set to false to prevent listener container shutdown when the endpoint is stopped.
182+
* Then, if so configured, any cached consumer(s) in the container will remain.
183+
* Otherwise the shared connection and will be closed and the listener invokers shut
184+
* down; this behavior is new starting with version 5.1. Default: true.
185+
* @param shutdown false to not shutdown.
186+
* @return the spec.
187+
* @since 5.1
188+
*/
189+
public S shutdownContainerOnStop(boolean shutdown) {
190+
this.target.setShutdownContainerOnStop(shutdown);
191+
return _this();
192+
}
193+
179194
/**
180195
* An {@link AbstractMessageListenerContainer}-based {@link JmsInboundGatewaySpec} extension.
181196
*

spring-integration-jms/src/main/java/org/springframework/integration/jms/dsl/JmsMessageDrivenChannelAdapterSpec.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -37,6 +37,7 @@
3737
* @param <S> the target {@link JmsMessageDrivenChannelAdapterSpec} implementation type.
3838
*
3939
* @author Artem Bilan
40+
* @author Gary Russell
4041
*
4142
* @since 5.0
4243
*/
@@ -78,6 +79,20 @@ public S extractPayload(boolean extractRequestPayload) {
7879
return _this();
7980
}
8081

82+
/**
83+
* Set to false to prevent listener container shutdown when the endpoint is stopped.
84+
* Then, if so configured, any cached consumer(s) in the container will remain.
85+
* Otherwise the shared connection and will be closed and the listener invokers shut
86+
* down; this behavior is new starting with version 5.1. Default: true.
87+
* @param shutdown false to not shutdown.
88+
* @return the spec.
89+
* @since 5.1
90+
*/
91+
public S shutdownContainerOnStop(boolean shutdown) {
92+
this.target.setShutdownContainerOnStop(shutdown);
93+
return _this();
94+
}
95+
8196
/**
8297
*
8398
* @param <S> the target {@link JmsListenerContainerSpec} implementation type.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.jms.config;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import javax.jms.ConnectionFactory;
22+
23+
import org.apache.activemq.ActiveMQConnectionFactory;
24+
import org.junit.jupiter.api.Test;
25+
26+
import org.springframework.beans.factory.annotation.Autowired;
27+
import org.springframework.context.annotation.Bean;
28+
import org.springframework.context.annotation.Configuration;
29+
import org.springframework.integration.channel.QueueChannel;
30+
import org.springframework.integration.config.EnableIntegration;
31+
import org.springframework.integration.jms.ChannelPublishingJmsMessageListener;
32+
import org.springframework.integration.jms.JmsMessageDrivenEndpoint;
33+
import org.springframework.integration.test.util.TestUtils;
34+
import org.springframework.jms.connection.CachingConnectionFactory;
35+
import org.springframework.jms.core.JmsTemplate;
36+
import org.springframework.jms.listener.AbstractMessageListenerContainer;
37+
import org.springframework.jms.listener.DefaultMessageListenerContainer;
38+
import org.springframework.test.annotation.DirtiesContext;
39+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
40+
41+
/**
42+
* @author Gary Russell
43+
* @since 5.1
44+
*
45+
*/
46+
@SpringJUnitConfig
47+
@DirtiesContext
48+
public class JmsMessageDrivenEndpointTests {
49+
50+
@Test
51+
public void testStopStart(@Autowired JmsTemplate template,
52+
@Autowired JmsMessageDrivenEndpoint endpoint, @Autowired QueueChannel out) {
53+
template.convertAndSend("stop.start", "foo");
54+
assertThat(out.receive(10_000).getPayload()).isEqualTo("foo");
55+
endpoint.stop();
56+
assertThat(TestUtils.getPropertyValue(endpoint, "listenerContainer.sharedConnection")).isNull();
57+
endpoint.start();
58+
template.convertAndSend("stop.start", "bar");
59+
assertThat(out.receive(10_000).getPayload()).isEqualTo("bar");
60+
}
61+
62+
@Configuration
63+
@EnableIntegration
64+
public static class Config {
65+
66+
67+
@Bean
68+
public ConnectionFactory cf() {
69+
return new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
70+
}
71+
72+
@Bean
73+
public CachingConnectionFactory ccf() {
74+
return new CachingConnectionFactory(cf());
75+
}
76+
77+
@Bean
78+
public JmsTemplate template() {
79+
return new JmsTemplate(ccf());
80+
}
81+
82+
@Bean
83+
public JmsMessageDrivenEndpoint inbound() {
84+
JmsMessageDrivenEndpoint endpoint = new JmsMessageDrivenEndpoint(container(), listener());
85+
return endpoint;
86+
}
87+
88+
@Bean
89+
public AbstractMessageListenerContainer container() {
90+
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
91+
container.setConnectionFactory(cf());
92+
container.setSessionTransacted(true);
93+
container.setDestinationName("stop.start");
94+
return container;
95+
}
96+
97+
@Bean
98+
public ChannelPublishingJmsMessageListener listener() {
99+
ChannelPublishingJmsMessageListener listener = new ChannelPublishingJmsMessageListener();
100+
listener.setRequestChannel(out());
101+
return listener;
102+
}
103+
104+
@Bean
105+
public QueueChannel out() {
106+
return new QueueChannel();
107+
}
108+
109+
}
110+
111+
}

src/reference/asciidoc/jms.adoc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,10 @@ When consuming from topics, set the `pub-sub-domain` attribute to true.
133133
Set `subscription-durable` to `true` for a durable subscription or `subscription-shared` for a shared subscription (which requires a JMS 2.0 broker and has been available since version 4.2).
134134
Use `subscription-name` to name the subscription.
135135

136+
Starting with version 5.1, when the endpoint is stopped while the application remains running, the underlying listener container is shut down, closing its shared connection and consumers.
137+
Previously, the connection and consumers remained open.
138+
To revert to the previous behavior, set the `shutdownContainerOnStop` on the `JmsMessageDrivenEndpoint` to `false`.
139+
136140
[[jms-md-conversion-errors]]
137141
==== Inbound Conversion Errors
138142

@@ -249,6 +253,10 @@ IMPORTANT: Starting with version 4.2, the default `acknowledge` mode is `transac
249253
In that case, you should configure the container as needed.
250254
We recommend that you use `transacted` with the `DefaultMessageListenerContainer` to avoid message loss.
251255

256+
Starting with version 5.1, when the endpoint is stopped while the application remains running, the underlying listener container is shut down, closing its shared connection and consumers.
257+
Previously, the connection and consumers remained open.
258+
To revert to the previous behavior, set the `shutdownContainerOnStop` on the `JmsInboundGateway` to `false`.
259+
252260
[[jms-outbound-gateway]]
253261
=== Outbound Gateway
254262

src/reference/asciidoc/whats-new.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,5 +161,7 @@ See https://github.com/spring-projects/spring-integration-extensions/tree/master
161161

162162
The `JmsSendingMessageHandler` now provides `deliveryModeExpression` and `timeToLiveExpression` options to determine respective QoS options for JMS message to send at runtime.
163163
The `DefaultJmsHeaderMapper` now allows to map inbound `JMSDeliveryMode` and `JMSExpiration` properties via setting to `true` respective `setMapInboundDeliveryMode()` and `setMapInboundExpiration()` options.
164+
When a `JmsMessageDrivenEndpoint` or `JmsInboundGateway` is stopped, the associated listener container is now shut down; this closes its shared connection and any consumers.
165+
You can configure the endpoints to revert to the previous behavior.
164166

165167
See <<jms>> for more information.

0 commit comments

Comments
 (0)