Skip to content

Commit 4581724

Browse files
committed
spring-projectsGH-3326: TCP: Support Unsolicited Server Messages
Resolves spring-projects#3326 - OB Gateway - send unsolicited messages and late replies to a channel - Support multiple `TcpSender`s
1 parent 381a071 commit 4581724

File tree

10 files changed

+225
-73
lines changed

10 files changed

+225
-73
lines changed

spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundGatewaySpec.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,11 @@ public TcpOutboundGatewaySpec async(boolean async) {
114114
return this;
115115
}
116116

117+
public TcpOutboundGatewaySpec unsolictedMessageChannelName(String channelName) {
118+
this.target.setUnsolicitedMessageChannelName(channelName);
119+
return this;
120+
}
121+
117122
@Override
118123
public Map<Object, String> getComponentsToRegister() {
119124
return this.connectionFactory != null

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/TcpOutboundGateway.java

Lines changed: 97 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,17 @@ public class TcpOutboundGateway extends AbstractReplyProducingMessageHandler
9191

9292
private boolean closeStreamAfterSend;
9393

94+
private String unsolicitedMessageChannelName;
95+
96+
private MessageChannel unsolicitedMessageChannel;
97+
98+
public void setConnectionFactory(AbstractClientConnectionFactory connectionFactory) {
99+
this.connectionFactory = connectionFactory;
100+
connectionFactory.registerListener(this);
101+
connectionFactory.registerSender(this);
102+
this.isSingleUse = connectionFactory.isSingleUse();
103+
}
104+
94105
/**
95106
* @param requestTimeout the requestTimeout to set
96107
*/
@@ -118,14 +129,53 @@ public void setIntegrationEvaluationContext(EvaluationContext evaluationContext)
118129
this.evaluationContextSet = true;
119130
}
120131

121-
@Override
122-
protected void doInit() {
123-
super.doInit();
124-
if (!this.evaluationContextSet) {
125-
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
126-
}
127-
Assert.state(!this.closeStreamAfterSend || this.isSingleUse,
128-
"Single use connection needed with closeStreamAfterSend");
132+
/**
133+
* Specify the Spring Integration reply channel. If this property is not
134+
* set the gateway will check for a 'replyChannel' header on the request.
135+
* @param replyChannel The reply channel.
136+
*/
137+
public void setReplyChannel(MessageChannel replyChannel) {
138+
this.setOutputChannel(replyChannel);
139+
}
140+
141+
/**
142+
* Specify the Spring Integration reply channel name. If this property is not
143+
* set the gateway will check for a 'replyChannel' header on the request.
144+
* @param replyChannel The reply channel.
145+
* @since 5.0
146+
*/
147+
public void setReplyChannelName(String replyChannel) {
148+
this.setOutputChannelName(replyChannel);
149+
}
150+
151+
/**
152+
* Set the channel name for unsolicited incoming messages, or late replies.
153+
* @param unsolicitedMessageChannelName the channel name.
154+
* @since 5.4
155+
*/
156+
public void setUnsolicitedMessageChannelName(String unsolicitedMessageChannelName) {
157+
this.unsolicitedMessageChannelName = unsolicitedMessageChannelName;
158+
}
159+
160+
/**
161+
* Set the channel for unsolicited incoming messages, or late replies.
162+
* @param unsolicitedMessageChannel the channel.
163+
* @since 5.4
164+
*/
165+
public void setUnsolicitedMessageChannel(MessageChannel unsolicitedMessageChannel) {
166+
this.unsolicitedMessageChannel = unsolicitedMessageChannel;
167+
}
168+
169+
/**
170+
* Set to true to close the connection ouput stream after sending without
171+
* closing the connection. Use to signal EOF to the server, such as when using
172+
* a {@link org.springframework.integration.ip.tcp.serializer.ByteArrayRawSerializer}.
173+
* Requires a single-use connection factory.
174+
* @param closeStreamAfterSend true to close.
175+
* @since 5.2
176+
*/
177+
public void setCloseStreamAfterSend(boolean closeStreamAfterSend) {
178+
this.closeStreamAfterSend = closeStreamAfterSend;
129179
}
130180

131181
/**
@@ -140,6 +190,21 @@ public void setSecondChanceDelay(int secondChanceDelay) {
140190
this.secondChanceDelay = secondChanceDelay;
141191
}
142192

193+
@Override
194+
public String getComponentType() {
195+
return "ip:tcp-outbound-gateway";
196+
}
197+
198+
@Override
199+
protected void doInit() {
200+
super.doInit();
201+
if (!this.evaluationContextSet) {
202+
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
203+
}
204+
Assert.state(!this.closeStreamAfterSend || this.isSingleUse,
205+
"Single use connection needed with closeStreamAfterSend");
206+
}
207+
143208
@Override
144209
protected Object handleRequestMessage(Message<?> requestMessage) {
145210
Assert.notNull(this.connectionFactory, this.getClass().getName() +
@@ -260,6 +325,9 @@ private void cleanUp(boolean haveSemaphore, TcpConnection connection, String con
260325
public boolean onMessage(Message<?> message) {
261326
String connectionId = message.getHeaders().get(IpHeaders.CONNECTION_ID, String.class);
262327
if (connectionId == null) {
328+
if (unsolicitedSupported(message)) {
329+
return false;
330+
}
263331
logger.error("Cannot correlate response - no connection id");
264332
publishNoConnectionEvent(message, null, "Cannot correlate response - no connection id");
265333
return false;
@@ -277,6 +345,9 @@ public boolean onMessage(Message<?> message) {
277345
return false;
278346
}
279347
else {
348+
if (unsolicitedSupported(message)) {
349+
return false;
350+
}
280351
String errorMessage = "Cannot correlate response - no pending reply for " + connectionId;
281352
logger.error(errorMessage);
282353
publishNoConnectionEvent(message, connectionId, errorMessage);
@@ -293,6 +364,24 @@ public boolean onMessage(Message<?> message) {
293364
return false;
294365
}
295366

367+
private boolean unsolicitedSupported(Message<?> message) {
368+
String channelName = this.unsolicitedMessageChannelName;
369+
if (channelName != null) {
370+
this.unsolicitedMessageChannel = getChannelResolver().resolveDestination(channelName);
371+
this.unsolicitedMessageChannelName = null;
372+
}
373+
if (this.unsolicitedMessageChannel != null) {
374+
try {
375+
this.messagingTemplate.send(this.unsolicitedMessageChannel, message);
376+
}
377+
catch (Exception e) {
378+
logger.error("Failed to send unsolicited message " + message, e);
379+
}
380+
return true;
381+
}
382+
return false;
383+
}
384+
296385
private void publishNoConnectionEvent(Message<?> message, String connectionId, String errorMessage) {
297386
ApplicationEventPublisher applicationEventPublisher = this.connectionFactory.getApplicationEventPublisher();
298387
if (applicationEventPublisher != null) {
@@ -301,13 +390,6 @@ private void publishNoConnectionEvent(Message<?> message, String connectionId, S
301390
}
302391
}
303392

304-
public void setConnectionFactory(AbstractClientConnectionFactory connectionFactory) {
305-
this.connectionFactory = connectionFactory;
306-
connectionFactory.registerListener(this);
307-
connectionFactory.registerSender(this);
308-
this.isSingleUse = connectionFactory.isSingleUse();
309-
}
310-
311393
@Override
312394
public void addNewConnection(TcpConnection connection) {
313395
// do nothing - no asynchronous multiplexing supported
@@ -318,42 +400,6 @@ public void removeDeadConnection(TcpConnection connection) {
318400
// do nothing - no asynchronous multiplexing supported
319401
}
320402

321-
/**
322-
* Specify the Spring Integration reply channel. If this property is not
323-
* set the gateway will check for a 'replyChannel' header on the request.
324-
* @param replyChannel The reply channel.
325-
*/
326-
public void setReplyChannel(MessageChannel replyChannel) {
327-
this.setOutputChannel(replyChannel);
328-
}
329-
330-
/**
331-
* Specify the Spring Integration reply channel name. If this property is not
332-
* set the gateway will check for a 'replyChannel' header on the request.
333-
* @param replyChannel The reply channel.
334-
* @since 5.0
335-
*/
336-
public void setReplyChannelName(String replyChannel) {
337-
this.setOutputChannelName(replyChannel);
338-
}
339-
340-
/**
341-
* Set to true to close the connection ouput stream after sending without
342-
* closing the connection. Use to signal EOF to the server, such as when using
343-
* a {@link org.springframework.integration.ip.tcp.serializer.ByteArrayRawSerializer}.
344-
* Requires a single-use connection factory.
345-
* @param closeStreamAfterSend true to close.
346-
* @since 5.2
347-
*/
348-
public void setCloseStreamAfterSend(boolean closeStreamAfterSend) {
349-
this.closeStreamAfterSend = closeStreamAfterSend;
350-
}
351-
352-
@Override
353-
public String getComponentType() {
354-
return "ip:tcp-outbound-gateway";
355-
}
356-
357403
@Override
358404
public void start() {
359405
this.connectionFactory.start();

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractClientConnectionFactory.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -218,10 +218,7 @@ protected void initializeConnection(TcpConnectionSupport connection, Socket sock
218218
connection.registerListener(listener);
219219
}
220220
}
221-
TcpSender sender = getSender();
222-
if (sender != null) {
223-
connection.registerSender(sender);
224-
}
221+
connection.registerSenders(getSenders());
225222
connection.setMapper(getMapper());
226223
connection.setDeserializer(getDeserializer());
227224
connection.setSerializer(getSerializer());

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractConnectionFactory.java

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -79,14 +79,14 @@ public abstract class AbstractConnectionFactory extends IntegrationObjectSupport
7979

8080
private final BlockingQueue<PendingIO> delayedReads = new LinkedBlockingQueue<>();
8181

82+
private final List<TcpSender> senders = Collections.synchronizedList(new ArrayList<>());
83+
8284
private String host;
8385

8486
private int port;
8587

8688
private TcpListener listener;
8789

88-
private TcpSender sender;
89-
9090
private int soTimeout = -1;
9191

9292
private int soSendBufferSize;
@@ -326,11 +326,20 @@ public TcpListener getListener() {
326326
}
327327

328328
/**
329-
* @return the sender
329+
* @return the first sender, if present.
330330
*/
331331
@Nullable
332332
public TcpSender getSender() {
333-
return this.sender;
333+
return this.senders.size() > 0 ? this.senders.get(0) : null;
334+
}
335+
336+
/**
337+
* Return the list of senders.
338+
* @return the senders.
339+
* @since 5.4
340+
*/
341+
public List<TcpSender> getSenders() {
342+
return Collections.unmodifiableList(this.senders);
334343
}
335344

336345
/**
@@ -372,8 +381,16 @@ public void registerListener(TcpListener listenerToRegister) {
372381
* @param senderToRegister The sender
373382
*/
374383
public void registerSender(TcpSender senderToRegister) {
375-
Assert.isNull(this.sender, this.getClass().getName() + " may only be used by one outbound adapter");
376-
this.sender = senderToRegister;
384+
this.senders.add(senderToRegister);
385+
}
386+
387+
/**
388+
* Unregister a TcpSender.
389+
* @param sender the sender.
390+
* @return true if the sender was registered.
391+
*/
392+
public boolean unregisterSender(TcpSender sender) {
393+
return this.senders.remove(sender);
377394
}
378395

379396
/**
@@ -600,7 +617,7 @@ protected TcpConnectionSupport wrapConnection(TcpConnectionSupport connectionArg
600617
if (this.listener == null) {
601618
connection.registerListener(wrapper);
602619
}
603-
if (this.sender == null) {
620+
if (this.senders.size() == 0) {
604621
connection.registerSender(wrapper);
605622
}
606623
connection = wrapper;

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractServerConnectionFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ protected void initializeConnection(TcpConnectionSupport connection, Socket sock
127127
if (listener != null) {
128128
connection.registerListener(listener);
129129
}
130-
connection.registerSender(getSender());
130+
connection.registerSenders(getSenders());
131131
connection.setMapper(getMapper());
132132
connection.setDeserializer(getDeserializer());
133133
connection.setSerializer(getSerializer());

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionSupport.java

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818

1919
import java.net.InetAddress;
2020
import java.net.Socket;
21+
import java.util.ArrayList;
2122
import java.util.Collections;
23+
import java.util.List;
2224
import java.util.Map;
2325
import java.util.UUID;
2426
import java.util.concurrent.CountDownLatch;
@@ -68,6 +70,8 @@ public abstract class TcpConnectionSupport implements TcpConnection {
6870

6971
private final SocketInfo socketInfo;
7072

73+
private final List<TcpSender> senders = Collections.synchronizedList(new ArrayList<>());
74+
7175
@SuppressWarnings("rawtypes")
7276
private Deserializer deserializer;
7377

@@ -80,8 +84,6 @@ public abstract class TcpConnectionSupport implements TcpConnection {
8084

8185
private volatile TcpListener testListener;
8286

83-
private TcpSender sender;
84-
8587
private String connectionId;
8688

8789
private String hostName = "unknown";
@@ -162,8 +164,8 @@ void setTestFailed(boolean testFailed) {
162164
*/
163165
@Override
164166
public void close() {
165-
if (this.sender != null) {
166-
this.sender.removeDeadConnection(this);
167+
for (TcpSender sender : this.senders) {
168+
sender.removeDeadConnection(this);
167169
}
168170
// close() may be called multiple times; only publish once
169171
if (!this.closePublished.getAndSet(true)) {
@@ -297,11 +299,25 @@ public void enableManualListenerRegistration() {
297299
* Registers a sender. Used on server side connections so a
298300
* sender can determine which connection to send a reply
299301
* to.
300-
* @param sender the sender.
302+
* @param senderToRegister the sender.
301303
*/
302-
public void registerSender(@Nullable TcpSender sender) {
303-
this.sender = sender;
304-
if (sender != null) {
304+
public void registerSender(@Nullable TcpSender senderToRegister) {
305+
if (senderToRegister != null) {
306+
this.senders.add(senderToRegister);
307+
senderToRegister.addNewConnection(this);
308+
}
309+
}
310+
311+
/**
312+
* Registers the senders. Used on server side connections so a
313+
* sender can determine which connection to send a reply
314+
* to.
315+
* @param sendersToRegister the sender.
316+
* @since 5.4
317+
*/
318+
public void registerSenders(List<TcpSender> sendersToRegister) {
319+
this.senders.addAll(sendersToRegister);
320+
for (TcpSender sender : sendersToRegister) {
305321
sender.addNewConnection(this);
306322
}
307323
}
@@ -336,10 +352,20 @@ private void waitForListenerRegistration() {
336352
}
337353

338354
/**
339-
* @return the sender
355+
* @return the first sender, if present.
340356
*/
357+
@Nullable
341358
public TcpSender getSender() {
342-
return this.sender;
359+
return this.senders.size() > 0 ? this.senders.get(0) : null;
360+
}
361+
362+
/**
363+
* Return the list of senders.
364+
* @return the senders.
365+
* @since 5.4
366+
*/
367+
public List<TcpSender> getSenders() {
368+
return Collections.unmodifiableList(this.senders);
343369
}
344370

345371
@Override

0 commit comments

Comments
 (0)