Skip to content

Commit abda16b

Browse files
committed
GH-3713: TCP: Fix Intercepted Sender List
Resolves #3713 #3326 added support for multiple `TcpSenders`. However, when connections are intercepted, the sender list was not properly chained through the interceptors. - override `registerSenders` and properly capture the real senders in the last interceptor and intermediate interceptors - this ensures that `addNewConnection` is called on each interceptor - when removing dead connections, use the (possibly intercepted) connection sender list insted of the factory's raw sender list. **cherry-pick to 5.5.x**
1 parent 98b2d1b commit abda16b

File tree

4 files changed

+55
-22
lines changed

4 files changed

+55
-22
lines changed

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/AbstractConnectionFactory.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -563,10 +563,10 @@ public void stop() {
563563
synchronized (this.connections) {
564564
Iterator<Entry<String, TcpConnectionSupport>> iterator = this.connections.entrySet().iterator();
565565
while (iterator.hasNext()) {
566-
TcpConnection connection = iterator.next().getValue();
566+
TcpConnectionSupport connection = iterator.next().getValue();
567567
connection.close();
568568
iterator.remove();
569-
getSenders().forEach(sender -> sender.removeDeadConnection(connection));
569+
connection.getSenders().forEach(sender -> sender.removeDeadConnection(connection));
570570
}
571571
}
572572
synchronized (this.lifecycleMonitor) {
@@ -866,7 +866,7 @@ private List<String> removeClosedConnectionsAndReturnOpenConnectionIds() {
866866
TcpConnectionSupport connection = entry.getValue();
867867
if (!connection.isOpen()) {
868868
iterator.remove();
869-
getSenders().forEach(sender -> sender.removeDeadConnection(connection));
869+
connection.getSenders().forEach(sender -> sender.removeDeadConnection(connection));
870870
logger.debug(() -> getComponentName() + ": Removed closed connection: " +
871871
connection.getConnectionId());
872872
}
@@ -944,7 +944,7 @@ public boolean closeConnection(String connectionId) {
944944
try {
945945
connection.close();
946946
closed = true;
947-
getSenders().forEach(sender -> sender.removeDeadConnection(connection));
947+
connection.getSenders().forEach(sender -> sender.removeDeadConnection(connection));
948948
}
949949
catch (Exception ex) {
950950
logger.debug(ex, () -> "Failed to close connection " + connectionId);

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionInterceptorSupport.java

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2021 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,11 +16,14 @@
1616

1717
package org.springframework.integration.ip.tcp.connection;
1818

19+
import java.util.List;
20+
1921
import javax.net.ssl.SSLSession;
2022

2123
import org.springframework.context.ApplicationEventPublisher;
2224
import org.springframework.core.serializer.Deserializer;
2325
import org.springframework.core.serializer.Serializer;
26+
import org.springframework.lang.Nullable;
2427
import org.springframework.messaging.Message;
2528
import org.springframework.messaging.support.ErrorMessage;
2629

@@ -38,9 +41,10 @@ public abstract class TcpConnectionInterceptorSupport extends TcpConnectionSuppo
3841

3942
private TcpListener tcpListener;
4043

41-
private TcpSender tcpSender;
44+
private boolean realSender;
45+
46+
private List<TcpSender> interceptedSenders;
4247

43-
private Boolean realSender;
4448

4549
public TcpConnectionInterceptorSupport() {
4650
}
@@ -92,10 +96,24 @@ public void registerListener(TcpListener listener) {
9296

9397
@Override
9498
public void registerSender(TcpSender sender) {
95-
this.tcpSender = sender;
9699
this.theConnection.registerSender(this);
97100
}
98101

102+
@Override
103+
public void registerSenders(List<TcpSender> sendersToRegister) {
104+
this.interceptedSenders = sendersToRegister;
105+
if (sendersToRegister.size() > 0) {
106+
if (!(sendersToRegister.get(0) instanceof TcpConnectionInterceptorSupport)) {
107+
this.realSender = true;
108+
}
109+
else {
110+
this.realSender = ((TcpConnectionInterceptorSupport) this.interceptedSenders.get(0))
111+
.hasRealSender();
112+
}
113+
}
114+
super.registerSender(this);
115+
}
116+
99117
/**
100118
* {@inheritDoc}
101119
* <p>
@@ -198,21 +216,22 @@ public void setTheConnection(TcpConnectionSupport theConnection) {
198216
* @return the listener
199217
*/
200218
@Override
219+
@Nullable
201220
public TcpListener getListener() {
202221
return this.tcpListener;
203222
}
204223

205224
@Override
206225
public void addNewConnection(TcpConnection connection) {
207-
if (this.tcpSender != null) {
208-
this.tcpSender.addNewConnection(this);
226+
if (this.interceptedSenders != null) {
227+
this.interceptedSenders.forEach(sender -> sender.addNewConnection(connection));
209228
}
210229
}
211230

212231
@Override
213232
public void removeDeadConnection(TcpConnection connection) {
214-
if (this.tcpSender != null) {
215-
this.tcpSender.removeDeadConnection(this);
233+
if (this.interceptedSenders != null) {
234+
this.interceptedSenders.forEach(sender -> sender.removeDeadConnection(connection));
216235
}
217236
}
218237

@@ -222,19 +241,14 @@ public long incrementAndGetConnectionSequence() {
222241
}
223242

224243
@Override
244+
@Nullable
225245
public TcpSender getSender() {
226-
return this.tcpSender;
246+
return this.interceptedSenders != null && this.interceptedSenders.size() > 0
247+
? this.interceptedSenders.get(0)
248+
: null;
227249
}
228250

229251
protected boolean hasRealSender() {
230-
if (this.realSender != null) {
231-
return this.realSender;
232-
}
233-
TcpSender sender = getSender();
234-
while (sender instanceof TcpConnectionInterceptorSupport) {
235-
sender = ((TcpConnectionInterceptorSupport) sender).getSender();
236-
}
237-
this.realSender = sender != null;
238252
return this.realSender;
239253
}
240254

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/TcpConnectionSupport.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2001-2021 the original author or authors.
2+
* Copyright 2001-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -337,6 +337,7 @@ public void registerSenders(List<TcpSender> sendersToRegister) {
337337
* @return the listener
338338
*/
339339
@Override
340+
@Nullable
340341
public TcpListener getListener() {
341342
if (this.needsTest && this.testListener != null) {
342343
this.needsTest = false;

spring-integration-ip/src/test/java/org/springframework/integration/ip/tcp/connection/TcpSenderTests.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,12 +69,27 @@ void senderCalledForDeadConnectionClientNio() throws InterruptedException {
6969
private void senderCalledForDeadConnectionClient(AbstractClientConnectionFactory client) throws InterruptedException {
7070
CountDownLatch adds = new CountDownLatch(2);
7171
CountDownLatch removes = new CountDownLatch(2);
72+
CountDownLatch interceptorAddCalled = new CountDownLatch(2);
73+
CountDownLatch interceptorRemCalled = new CountDownLatch(2);
7274
TcpConnectionInterceptorFactoryChain chain = new TcpConnectionInterceptorFactoryChain();
7375
chain.setInterceptor(new HelloWorldInterceptorFactory() {
7476

7577
@Override
7678
public TcpConnectionInterceptorSupport getInterceptor() {
7779
return new TcpConnectionInterceptorSupport() {
80+
81+
@Override
82+
public void addNewConnection(TcpConnection connection) {
83+
interceptorAddCalled.countDown();
84+
super.addNewConnection(connection);
85+
}
86+
87+
@Override
88+
public void removeDeadConnection(TcpConnection connection) {
89+
interceptorRemCalled.countDown();
90+
super.removeDeadConnection(connection);
91+
}
92+
7893
};
7994
}
8095

@@ -97,12 +112,15 @@ public void removeDeadConnection(TcpConnection connection) {
97112
client.afterPropertiesSet();
98113
client.start();
99114
TcpConnectionSupport conn = client.getConnection();
115+
assertThat(((TcpConnectionInterceptorSupport) conn).hasRealSender()).isTrue();
100116
conn.close();
101117
conn = client.getConnection();
102118
assertThat(adds.await(10, TimeUnit.SECONDS)).isTrue();
103119
conn.close();
104120
client.stop();
105121
assertThat(removes.await(10, TimeUnit.SECONDS)).isTrue();
122+
assertThat(interceptorAddCalled.await(10, TimeUnit.SECONDS)).isTrue();
123+
assertThat(interceptorRemCalled.await(10, TimeUnit.SECONDS)).isTrue();
106124
}
107125

108126
}

0 commit comments

Comments
 (0)