Skip to content

GH-3326: TCP: Support Unsolicited Server Messages #3327

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 6, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ public TcpOutboundGatewaySpec async(boolean async) {
return this;
}

public TcpOutboundGatewaySpec unsolictedMessageChannelName(String channelName) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like you have there this setter as well setUnsolicitedMessageChannel(MessageChannel unsolicitedMessageChannel). Don't we need to have similar option here on DSL level?
Thanks

this.target.setUnsolicitedMessageChannelName(channelName);
return this;
}

@Override
public Map<Object, String> getComponentsToRegister() {
return this.connectionFactory != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,17 @@ public class TcpOutboundGateway extends AbstractReplyProducingMessageHandler

private boolean closeStreamAfterSend;

private String unsolicitedMessageChannelName;

private MessageChannel unsolicitedMessageChannel;

public void setConnectionFactory(AbstractClientConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
connectionFactory.registerListener(this);
connectionFactory.registerSender(this);
this.isSingleUse = connectionFactory.isSingleUse();
}

/**
* @param requestTimeout the requestTimeout to set
*/
Expand Down Expand Up @@ -118,14 +129,53 @@ public void setIntegrationEvaluationContext(EvaluationContext evaluationContext)
this.evaluationContextSet = true;
}

@Override
protected void doInit() {
super.doInit();
if (!this.evaluationContextSet) {
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
}
Assert.state(!this.closeStreamAfterSend || this.isSingleUse,
"Single use connection needed with closeStreamAfterSend");
/**
* Specify the Spring Integration reply channel. If this property is not
* set the gateway will check for a 'replyChannel' header on the request.
* @param replyChannel The reply channel.
*/
public void setReplyChannel(MessageChannel replyChannel) {
this.setOutputChannel(replyChannel);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this. on method calls?
I guess you have just moved those existing setter over here for better class structure.
Nevertheless can we fix this. as well since we are already here in a change?

}

/**
* Specify the Spring Integration reply channel name. If this property is not
* set the gateway will check for a 'replyChannel' header on the request.
* @param replyChannel The reply channel.
* @since 5.0
*/
public void setReplyChannelName(String replyChannel) {
this.setOutputChannelName(replyChannel);
}

/**
* Set the channel name for unsolicited incoming messages, or late replies.
* @param unsolicitedMessageChannelName the channel name.
* @since 5.4
*/
public void setUnsolicitedMessageChannelName(String unsolicitedMessageChannelName) {
this.unsolicitedMessageChannelName = unsolicitedMessageChannelName;
}

/**
* Set the channel for unsolicited incoming messages, or late replies.
* @param unsolicitedMessageChannel the channel.
* @since 5.4
*/
public void setUnsolicitedMessageChannel(MessageChannel unsolicitedMessageChannel) {
this.unsolicitedMessageChannel = unsolicitedMessageChannel;
}

/**
* Set to true to close the connection ouput stream after sending without
* closing the connection. Use to signal EOF to the server, such as when using
* a {@link org.springframework.integration.ip.tcp.serializer.ByteArrayRawSerializer}.
* Requires a single-use connection factory.
* @param closeStreamAfterSend true to close.
* @since 5.2
*/
public void setCloseStreamAfterSend(boolean closeStreamAfterSend) {
this.closeStreamAfterSend = closeStreamAfterSend;
}

/**
Expand All @@ -140,6 +190,21 @@ public void setSecondChanceDelay(int secondChanceDelay) {
this.secondChanceDelay = secondChanceDelay;
}

@Override
public String getComponentType() {
return "ip:tcp-outbound-gateway";
}

@Override
protected void doInit() {
super.doInit();
if (!this.evaluationContextSet) {
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
}
Assert.state(!this.closeStreamAfterSend || this.isSingleUse,
"Single use connection needed with closeStreamAfterSend");
}

@Override
protected Object handleRequestMessage(Message<?> requestMessage) {
Assert.notNull(this.connectionFactory, this.getClass().getName() +
Expand Down Expand Up @@ -260,6 +325,9 @@ private void cleanUp(boolean haveSemaphore, TcpConnection connection, String con
public boolean onMessage(Message<?> message) {
String connectionId = message.getHeaders().get(IpHeaders.CONNECTION_ID, String.class);
if (connectionId == null) {
if (unsolicitedSupported(message)) {
return false;
}
logger.error("Cannot correlate response - no connection id");
publishNoConnectionEvent(message, null, "Cannot correlate response - no connection id");
return false;
Expand All @@ -277,6 +345,9 @@ public boolean onMessage(Message<?> message) {
return false;
}
else {
if (unsolicitedSupported(message)) {
return false;
}
String errorMessage = "Cannot correlate response - no pending reply for " + connectionId;
logger.error(errorMessage);
publishNoConnectionEvent(message, connectionId, errorMessage);
Expand All @@ -293,6 +364,24 @@ public boolean onMessage(Message<?> message) {
return false;
}

private boolean unsolicitedSupported(Message<?> message) {
String channelName = this.unsolicitedMessageChannelName;
if (channelName != null) {
this.unsolicitedMessageChannel = getChannelResolver().resolveDestination(channelName);
this.unsolicitedMessageChannelName = null;
}
if (this.unsolicitedMessageChannel != null) {
try {
this.messagingTemplate.send(this.unsolicitedMessageChannel, message);
}
catch (Exception e) {
logger.error("Failed to send unsolicited message " + message, e);
}
return true;
}
return false;
}

private void publishNoConnectionEvent(Message<?> message, String connectionId, String errorMessage) {
ApplicationEventPublisher applicationEventPublisher = this.connectionFactory.getApplicationEventPublisher();
if (applicationEventPublisher != null) {
Expand All @@ -301,13 +390,6 @@ private void publishNoConnectionEvent(Message<?> message, String connectionId, S
}
}

public void setConnectionFactory(AbstractClientConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
connectionFactory.registerListener(this);
connectionFactory.registerSender(this);
this.isSingleUse = connectionFactory.isSingleUse();
}

@Override
public void addNewConnection(TcpConnection connection) {
// do nothing - no asynchronous multiplexing supported
Expand All @@ -318,42 +400,6 @@ public void removeDeadConnection(TcpConnection connection) {
// do nothing - no asynchronous multiplexing supported
}

/**
* Specify the Spring Integration reply channel. If this property is not
* set the gateway will check for a 'replyChannel' header on the request.
* @param replyChannel The reply channel.
*/
public void setReplyChannel(MessageChannel replyChannel) {
this.setOutputChannel(replyChannel);
}

/**
* Specify the Spring Integration reply channel name. If this property is not
* set the gateway will check for a 'replyChannel' header on the request.
* @param replyChannel The reply channel.
* @since 5.0
*/
public void setReplyChannelName(String replyChannel) {
this.setOutputChannelName(replyChannel);
}

/**
* Set to true to close the connection ouput stream after sending without
* closing the connection. Use to signal EOF to the server, such as when using
* a {@link org.springframework.integration.ip.tcp.serializer.ByteArrayRawSerializer}.
* Requires a single-use connection factory.
* @param closeStreamAfterSend true to close.
* @since 5.2
*/
public void setCloseStreamAfterSend(boolean closeStreamAfterSend) {
this.closeStreamAfterSend = closeStreamAfterSend;
}

@Override
public String getComponentType() {
return "ip:tcp-outbound-gateway";
}

@Override
public void start() {
this.connectionFactory.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,7 @@ protected void initializeConnection(TcpConnectionSupport connection, Socket sock
connection.registerListener(listener);
}
}
TcpSender sender = getSender();
if (sender != null) {
connection.registerSender(sender);
}
connection.registerSenders(getSenders());
connection.setMapper(getMapper());
connection.setDeserializer(getDeserializer());
connection.setSerializer(getSerializer());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,14 @@ public abstract class AbstractConnectionFactory extends IntegrationObjectSupport

private final BlockingQueue<PendingIO> delayedReads = new LinkedBlockingQueue<>();

private final List<TcpSender> senders = Collections.synchronizedList(new ArrayList<>());

private String host;

private int port;

private TcpListener listener;

private TcpSender sender;

private int soTimeout = -1;

private int soSendBufferSize;
Expand Down Expand Up @@ -326,11 +326,20 @@ public TcpListener getListener() {
}

/**
* @return the sender
* @return the first sender, if present.
*/
@Nullable
public TcpSender getSender() {
return this.sender;
return this.senders.size() > 0 ? this.senders.get(0) : null;
}

/**
* Return the list of senders.
* @return the senders.
* @since 5.4
*/
public List<TcpSender> getSenders() {
return Collections.unmodifiableList(this.senders);
}

/**
Expand Down Expand Up @@ -372,8 +381,16 @@ public void registerListener(TcpListener listenerToRegister) {
* @param senderToRegister The sender
*/
public void registerSender(TcpSender senderToRegister) {
Assert.isNull(this.sender, this.getClass().getName() + " may only be used by one outbound adapter");
this.sender = senderToRegister;
this.senders.add(senderToRegister);
}

/**
* Unregister a TcpSender.
* @param sender the sender.
* @return true if the sender was registered.
*/
public boolean unregisterSender(TcpSender sender) {
return this.senders.remove(sender);
}

/**
Expand Down Expand Up @@ -600,7 +617,7 @@ protected TcpConnectionSupport wrapConnection(TcpConnectionSupport connectionArg
if (this.listener == null) {
connection.registerListener(wrapper);
}
if (this.sender == null) {
if (this.senders.size() == 0) {
connection.registerSender(wrapper);
}
connection = wrapper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ protected void initializeConnection(TcpConnectionSupport connection, Socket sock
if (listener != null) {
connection.registerListener(listener);
}
connection.registerSender(getSender());
connection.registerSenders(getSenders());
connection.setMapper(getMapper());
connection.setDeserializer(getDeserializer());
connection.setSerializer(getSerializer());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

import java.net.InetAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -68,6 +70,8 @@ public abstract class TcpConnectionSupport implements TcpConnection {

private final SocketInfo socketInfo;

private final List<TcpSender> senders = Collections.synchronizedList(new ArrayList<>());

@SuppressWarnings("rawtypes")
private Deserializer deserializer;

Expand All @@ -80,8 +84,6 @@ public abstract class TcpConnectionSupport implements TcpConnection {

private volatile TcpListener testListener;

private TcpSender sender;

private String connectionId;

private String hostName = "unknown";
Expand Down Expand Up @@ -162,8 +164,8 @@ void setTestFailed(boolean testFailed) {
*/
@Override
public void close() {
if (this.sender != null) {
this.sender.removeDeadConnection(this);
for (TcpSender sender : this.senders) {
sender.removeDeadConnection(this);
}
// close() may be called multiple times; only publish once
if (!this.closePublished.getAndSet(true)) {
Expand Down Expand Up @@ -297,11 +299,25 @@ public void enableManualListenerRegistration() {
* Registers a sender. Used on server side connections so a
* sender can determine which connection to send a reply
* to.
* @param sender the sender.
* @param senderToRegister the sender.
*/
public void registerSender(@Nullable TcpSender sender) {
this.sender = sender;
if (sender != null) {
public void registerSender(@Nullable TcpSender senderToRegister) {
if (senderToRegister != null) {
this.senders.add(senderToRegister);
senderToRegister.addNewConnection(this);
}
}

/**
* Registers the senders. Used on server side connections so a
* sender can determine which connection to send a reply
* to.
* @param sendersToRegister the sender.
* @since 5.4
*/
public void registerSenders(List<TcpSender> sendersToRegister) {
this.senders.addAll(sendersToRegister);
for (TcpSender sender : sendersToRegister) {
sender.addNewConnection(this);
}
}
Expand Down Expand Up @@ -336,10 +352,20 @@ private void waitForListenerRegistration() {
}

/**
* @return the sender
* @return the first sender, if present.
*/
@Nullable
public TcpSender getSender() {
return this.sender;
return this.senders.size() > 0 ? this.senders.get(0) : null;
}

/**
* Return the list of senders.
* @return the senders.
* @since 5.4
*/
public List<TcpSender> getSenders() {
return Collections.unmodifiableList(this.senders);
}

@Override
Expand Down
Loading