Skip to content

Commit e4bacc3

Browse files
authored
GH-8708: Fix concurrency around SFTP client (#8709)
Fixes #8708 According to the `org.apache.sshd.common.channel.ChannelAsyncOutputStream.writeBuffer()` JavaDocs cannot be used concurrently. * Introduce internal `DefaultSftpSessionFactory.ConcurrentSftpClient` extension of the `DefaultSftpClient` to set a `Lock` around `super.send(cmd, buffer);` * Remove lock from the `SftpSession` since it now is managed by the mentioned `ConcurrentSftpClient` **Cherry-pick to `6.1.x` & `6.0.x`**
1 parent e900a53 commit e4bacc3

File tree

3 files changed

+80
-30
lines changed

3 files changed

+80
-30
lines changed

spring-integration-sftp/src/main/java/org/springframework/integration/sftp/session/DefaultSftpSessionFactory.java

+32-3
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,15 @@
3636
import org.apache.sshd.common.SshConstants;
3737
import org.apache.sshd.common.config.keys.FilePasswordProvider;
3838
import org.apache.sshd.common.keyprovider.KeyIdentityProvider;
39+
import org.apache.sshd.common.util.buffer.Buffer;
3940
import org.apache.sshd.common.util.io.resource.AbstractIoResource;
4041
import org.apache.sshd.common.util.io.resource.IoResource;
4142
import org.apache.sshd.common.util.net.SshdSocketAddress;
4243
import org.apache.sshd.common.util.security.SecurityUtils;
4344
import org.apache.sshd.sftp.client.SftpClient;
44-
import org.apache.sshd.sftp.client.SftpClientFactory;
45+
import org.apache.sshd.sftp.client.SftpErrorDataHandler;
4546
import org.apache.sshd.sftp.client.SftpVersionSelector;
47+
import org.apache.sshd.sftp.client.impl.DefaultSftpClient;
4648

4749
import org.springframework.core.io.Resource;
4850
import org.springframework.integration.file.remote.session.SessionFactory;
@@ -281,8 +283,8 @@ public SftpSession getSession() {
281283
boolean freshSftpClient = false;
282284
if (sftpClient == null || !sftpClient.isOpen()) {
283285
sftpClient =
284-
SftpClientFactory.instance()
285-
.createSftpClient(initClientSession(), this.sftpVersionSelector);
286+
new ConcurrentSftpClient(initClientSession(), this.sftpVersionSelector,
287+
SftpErrorDataHandler.EMPTY);
286288
freshSftpClient = true;
287289
}
288290
sftpSession = new SftpSession(sftpClient);
@@ -395,4 +397,31 @@ public void resetSharedSession() {
395397
this.sharedSftpClient = null;
396398
}
397399

400+
/**
401+
* The {@link DefaultSftpClient} extension to lock the {@link #send(int, Buffer)}
402+
* for concurrent interaction.
403+
*/
404+
private static class ConcurrentSftpClient extends DefaultSftpClient {
405+
406+
private final Lock sendLock = new ReentrantLock();
407+
408+
ConcurrentSftpClient(ClientSession clientSession, SftpVersionSelector initialVersionSelector,
409+
SftpErrorDataHandler errorDataHandler) throws IOException {
410+
411+
super(clientSession, initialVersionSelector, errorDataHandler);
412+
}
413+
414+
@Override
415+
public int send(int cmd, Buffer buffer) throws IOException {
416+
this.sendLock.lock();
417+
try {
418+
return super.send(cmd, buffer);
419+
}
420+
finally {
421+
this.sendLock.unlock();
422+
}
423+
}
424+
425+
}
426+
398427
}

spring-integration-sftp/src/main/java/org/springframework/integration/sftp/session/SftpSession.java

+9-26
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@
2222
import java.io.UncheckedIOException;
2323
import java.net.SocketAddress;
2424
import java.time.Duration;
25-
import java.util.concurrent.locks.Lock;
26-
import java.util.concurrent.locks.ReentrantLock;
2725
import java.util.stream.Stream;
2826
import java.util.stream.StreamSupport;
2927

@@ -49,13 +47,10 @@
4947
* @author Gary Russell
5048
* @author Artem Bilan
5149
* @author Christian Tzolov
52-
*
5350
* @since 2.0
5451
*/
5552
public class SftpSession implements Session<SftpClient.DirEntry> {
5653

57-
private final Lock lock = new ReentrantLock();
58-
5954
private final SftpClient sftpClient;
6055

6156
public SftpSession(SftpClient sftpClient) {
@@ -113,7 +108,7 @@ public Stream<SftpClient.DirEntry> doList(String path) throws IOException {
113108
}
114109
}
115110
remoteDir =
116-
remoteDir.length() > 0 && remoteDir.charAt(0) == '/'
111+
!remoteDir.isEmpty() && remoteDir.charAt(0) == '/'
117112
? remoteDir
118113
: this.sftpClient.canonicalPath(remoteDir);
119114
return StreamSupport.stream(this.sftpClient.readDir(remoteDir).spliterator(), false)
@@ -138,30 +133,18 @@ public boolean finalizeRaw() {
138133

139134
@Override
140135
public void write(InputStream inputStream, String destination) throws IOException {
141-
this.lock.lock();
142-
try {
143-
OutputStream outputStream = this.sftpClient.write(destination);
144-
FileCopyUtils.copy(inputStream, outputStream);
145-
}
146-
finally {
147-
this.lock.unlock();
148-
}
136+
OutputStream outputStream = this.sftpClient.write(destination);
137+
FileCopyUtils.copy(inputStream, outputStream);
149138
}
150139

151140
@Override
152141
public void append(InputStream inputStream, String destination) throws IOException {
153-
this.lock.lock();
154-
try {
155-
OutputStream outputStream =
156-
this.sftpClient.write(destination,
157-
SftpClient.OpenMode.Create,
158-
SftpClient.OpenMode.Write,
159-
SftpClient.OpenMode.Append);
160-
FileCopyUtils.copy(inputStream, outputStream);
161-
}
162-
finally {
163-
this.lock.unlock();
164-
}
142+
OutputStream outputStream =
143+
this.sftpClient.write(destination,
144+
SftpClient.OpenMode.Create,
145+
SftpClient.OpenMode.Write,
146+
SftpClient.OpenMode.Append);
147+
FileCopyUtils.copy(inputStream, outputStream);
165148
}
166149

167150
@Override

spring-integration-sftp/src/test/java/org/springframework/integration/sftp/session/SftpSessionFactoryTests.java

+39-1
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,21 @@
1818

1919
import java.io.File;
2020
import java.io.IOException;
21+
import java.io.UncheckedIOException;
2122
import java.net.ConnectException;
2223
import java.time.Duration;
2324
import java.util.ArrayList;
2425
import java.util.Collections;
2526
import java.util.List;
27+
import java.util.stream.IntStream;
2628

2729
import org.apache.sshd.client.SshClient;
2830
import org.apache.sshd.client.auth.password.PasswordIdentityProvider;
2931
import org.apache.sshd.client.keyverifier.AcceptAllServerKeyVerifier;
3032
import org.apache.sshd.common.SshException;
3133
import org.apache.sshd.server.SshServer;
3234
import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider;
35+
import org.apache.sshd.sftp.client.SftpClient;
3336
import org.apache.sshd.sftp.server.SftpSubsystemFactory;
3437
import org.junit.jupiter.api.Test;
3538

@@ -45,7 +48,6 @@
4548
* @author Gary Russell
4649
* @author Artem Bilan
4750
* @author Auke Zaaiman
48-
*
4951
* @since 3.0.2
5052
*/
5153
public class SftpSessionFactoryTests {
@@ -154,4 +156,40 @@ void externallyProvidedSshClientShouldNotHaveItsConfigurationOverwritten() throw
154156
}
155157
}
156158

159+
@Test
160+
void concurrentSessionListDoesntCauseFailure() throws IOException {
161+
try (SshServer server = SshServer.setUpDefaultServer()) {
162+
server.setPasswordAuthenticator((arg0, arg1, arg2) -> true);
163+
server.setPort(0);
164+
server.setKeyPairProvider(new SimpleGeneratorHostKeyProvider(new File("hostkey.ser").toPath()));
165+
server.setSubsystemFactories(Collections.singletonList(new SftpSubsystemFactory()));
166+
server.start();
167+
168+
DefaultSftpSessionFactory sftpSessionFactory = new DefaultSftpSessionFactory();
169+
sftpSessionFactory.setHost("localhost");
170+
sftpSessionFactory.setPort(server.getPort());
171+
sftpSessionFactory.setUser("user");
172+
sftpSessionFactory.setPassword("pass");
173+
sftpSessionFactory.setAllowUnknownKeys(true);
174+
175+
SftpSession session = sftpSessionFactory.getSession();
176+
177+
List<SftpClient.DirEntry[]> dirEntries =
178+
IntStream.range(0, 10)
179+
.boxed()
180+
.parallel()
181+
.map(i -> {
182+
try {
183+
return session.list(".");
184+
}
185+
catch (IOException e) {
186+
throw new UncheckedIOException(e);
187+
}
188+
})
189+
.toList();
190+
191+
assertThat(dirEntries).hasSize(10);
192+
}
193+
}
194+
157195
}

0 commit comments

Comments
 (0)