-
Notifications
You must be signed in to change notification settings - Fork 1.1k
GH-3326: TCP: Support Unsolicited Server Messages #3327
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 1 commit
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -91,6 +91,17 @@ public class TcpOutboundGateway extends AbstractReplyProducingMessageHandler | |
|
||
private boolean closeStreamAfterSend; | ||
|
||
private String unsolicitedMessageChannelName; | ||
|
||
private MessageChannel unsolicitedMessageChannel; | ||
|
||
public void setConnectionFactory(AbstractClientConnectionFactory connectionFactory) { | ||
this.connectionFactory = connectionFactory; | ||
connectionFactory.registerListener(this); | ||
connectionFactory.registerSender(this); | ||
this.isSingleUse = connectionFactory.isSingleUse(); | ||
} | ||
|
||
/** | ||
* @param requestTimeout the requestTimeout to set | ||
*/ | ||
|
@@ -118,14 +129,53 @@ public void setIntegrationEvaluationContext(EvaluationContext evaluationContext) | |
this.evaluationContextSet = true; | ||
} | ||
|
||
@Override | ||
protected void doInit() { | ||
super.doInit(); | ||
if (!this.evaluationContextSet) { | ||
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory()); | ||
} | ||
Assert.state(!this.closeStreamAfterSend || this.isSingleUse, | ||
"Single use connection needed with closeStreamAfterSend"); | ||
/** | ||
* Specify the Spring Integration reply channel. If this property is not | ||
* set the gateway will check for a 'replyChannel' header on the request. | ||
* @param replyChannel The reply channel. | ||
*/ | ||
public void setReplyChannel(MessageChannel replyChannel) { | ||
this.setOutputChannel(replyChannel); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
} | ||
|
||
/** | ||
* Specify the Spring Integration reply channel name. If this property is not | ||
* set the gateway will check for a 'replyChannel' header on the request. | ||
* @param replyChannel The reply channel. | ||
* @since 5.0 | ||
*/ | ||
public void setReplyChannelName(String replyChannel) { | ||
this.setOutputChannelName(replyChannel); | ||
} | ||
|
||
/** | ||
* Set the channel name for unsolicited incoming messages, or late replies. | ||
* @param unsolicitedMessageChannelName the channel name. | ||
* @since 5.4 | ||
*/ | ||
public void setUnsolicitedMessageChannelName(String unsolicitedMessageChannelName) { | ||
this.unsolicitedMessageChannelName = unsolicitedMessageChannelName; | ||
} | ||
|
||
/** | ||
* Set the channel for unsolicited incoming messages, or late replies. | ||
* @param unsolicitedMessageChannel the channel. | ||
* @since 5.4 | ||
*/ | ||
public void setUnsolicitedMessageChannel(MessageChannel unsolicitedMessageChannel) { | ||
this.unsolicitedMessageChannel = unsolicitedMessageChannel; | ||
} | ||
|
||
/** | ||
* Set to true to close the connection ouput stream after sending without | ||
* closing the connection. Use to signal EOF to the server, such as when using | ||
* a {@link org.springframework.integration.ip.tcp.serializer.ByteArrayRawSerializer}. | ||
* Requires a single-use connection factory. | ||
* @param closeStreamAfterSend true to close. | ||
* @since 5.2 | ||
*/ | ||
public void setCloseStreamAfterSend(boolean closeStreamAfterSend) { | ||
this.closeStreamAfterSend = closeStreamAfterSend; | ||
} | ||
|
||
/** | ||
|
@@ -140,6 +190,21 @@ public void setSecondChanceDelay(int secondChanceDelay) { | |
this.secondChanceDelay = secondChanceDelay; | ||
} | ||
|
||
@Override | ||
public String getComponentType() { | ||
return "ip:tcp-outbound-gateway"; | ||
} | ||
|
||
@Override | ||
protected void doInit() { | ||
super.doInit(); | ||
if (!this.evaluationContextSet) { | ||
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory()); | ||
} | ||
Assert.state(!this.closeStreamAfterSend || this.isSingleUse, | ||
"Single use connection needed with closeStreamAfterSend"); | ||
} | ||
|
||
@Override | ||
protected Object handleRequestMessage(Message<?> requestMessage) { | ||
Assert.notNull(this.connectionFactory, this.getClass().getName() + | ||
|
@@ -260,6 +325,9 @@ private void cleanUp(boolean haveSemaphore, TcpConnection connection, String con | |
public boolean onMessage(Message<?> message) { | ||
String connectionId = message.getHeaders().get(IpHeaders.CONNECTION_ID, String.class); | ||
if (connectionId == null) { | ||
if (unsolicitedSupported(message)) { | ||
return false; | ||
} | ||
logger.error("Cannot correlate response - no connection id"); | ||
publishNoConnectionEvent(message, null, "Cannot correlate response - no connection id"); | ||
return false; | ||
|
@@ -277,6 +345,9 @@ public boolean onMessage(Message<?> message) { | |
return false; | ||
} | ||
else { | ||
if (unsolicitedSupported(message)) { | ||
return false; | ||
} | ||
String errorMessage = "Cannot correlate response - no pending reply for " + connectionId; | ||
logger.error(errorMessage); | ||
publishNoConnectionEvent(message, connectionId, errorMessage); | ||
|
@@ -293,6 +364,24 @@ public boolean onMessage(Message<?> message) { | |
return false; | ||
} | ||
|
||
private boolean unsolicitedSupported(Message<?> message) { | ||
String channelName = this.unsolicitedMessageChannelName; | ||
if (channelName != null) { | ||
this.unsolicitedMessageChannel = getChannelResolver().resolveDestination(channelName); | ||
this.unsolicitedMessageChannelName = null; | ||
} | ||
if (this.unsolicitedMessageChannel != null) { | ||
try { | ||
this.messagingTemplate.send(this.unsolicitedMessageChannel, message); | ||
} | ||
catch (Exception e) { | ||
logger.error("Failed to send unsolicited message " + message, e); | ||
} | ||
return true; | ||
} | ||
return false; | ||
} | ||
|
||
private void publishNoConnectionEvent(Message<?> message, String connectionId, String errorMessage) { | ||
ApplicationEventPublisher applicationEventPublisher = this.connectionFactory.getApplicationEventPublisher(); | ||
if (applicationEventPublisher != null) { | ||
|
@@ -301,13 +390,6 @@ private void publishNoConnectionEvent(Message<?> message, String connectionId, S | |
} | ||
} | ||
|
||
public void setConnectionFactory(AbstractClientConnectionFactory connectionFactory) { | ||
this.connectionFactory = connectionFactory; | ||
connectionFactory.registerListener(this); | ||
connectionFactory.registerSender(this); | ||
this.isSingleUse = connectionFactory.isSingleUse(); | ||
} | ||
|
||
@Override | ||
public void addNewConnection(TcpConnection connection) { | ||
// do nothing - no asynchronous multiplexing supported | ||
|
@@ -318,42 +400,6 @@ public void removeDeadConnection(TcpConnection connection) { | |
// do nothing - no asynchronous multiplexing supported | ||
} | ||
|
||
/** | ||
* Specify the Spring Integration reply channel. If this property is not | ||
* set the gateway will check for a 'replyChannel' header on the request. | ||
* @param replyChannel The reply channel. | ||
*/ | ||
public void setReplyChannel(MessageChannel replyChannel) { | ||
this.setOutputChannel(replyChannel); | ||
} | ||
|
||
/** | ||
* Specify the Spring Integration reply channel name. If this property is not | ||
* set the gateway will check for a 'replyChannel' header on the request. | ||
* @param replyChannel The reply channel. | ||
* @since 5.0 | ||
*/ | ||
public void setReplyChannelName(String replyChannel) { | ||
this.setOutputChannelName(replyChannel); | ||
} | ||
|
||
/** | ||
* Set to true to close the connection ouput stream after sending without | ||
* closing the connection. Use to signal EOF to the server, such as when using | ||
* a {@link org.springframework.integration.ip.tcp.serializer.ByteArrayRawSerializer}. | ||
* Requires a single-use connection factory. | ||
* @param closeStreamAfterSend true to close. | ||
* @since 5.2 | ||
*/ | ||
public void setCloseStreamAfterSend(boolean closeStreamAfterSend) { | ||
this.closeStreamAfterSend = closeStreamAfterSend; | ||
} | ||
|
||
@Override | ||
public String getComponentType() { | ||
return "ip:tcp-outbound-gateway"; | ||
} | ||
|
||
@Override | ||
public void start() { | ||
this.connectionFactory.start(); | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like you have there this setter as well
setUnsolicitedMessageChannel(MessageChannel unsolicitedMessageChannel)
. Don't we need to have similar option here on DSL level?Thanks