Skip to content

Commit 5a93607

Browse files
committed
GH-8708: Fix concurrency around SFTP client (#8709)
Fixes #8708 According to the `org.apache.sshd.common.channel.ChannelAsyncOutputStream.writeBuffer()` JavaDocs cannot be used concurrently. * Introduce internal `DefaultSftpSessionFactory.ConcurrentSftpClient` extension of the `DefaultSftpClient` to set a `Lock` around `super.send(cmd, buffer);` * Remove lock from the `SftpSession` since it now is managed by the mentioned `ConcurrentSftpClient` **Cherry-pick to `6.1.x` & `6.0.x`** # Conflicts: # spring-integration-sftp/src/main/java/org/springframework/integration/sftp/session/SftpSession.java
1 parent 86656de commit 5a93607

File tree

3 files changed

+81
-19
lines changed

3 files changed

+81
-19
lines changed

spring-integration-sftp/src/main/java/org/springframework/integration/sftp/session/DefaultSftpSessionFactory.java

+32-3
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,15 @@
3636
import org.apache.sshd.common.SshConstants;
3737
import org.apache.sshd.common.config.keys.FilePasswordProvider;
3838
import org.apache.sshd.common.keyprovider.KeyIdentityProvider;
39+
import org.apache.sshd.common.util.buffer.Buffer;
3940
import org.apache.sshd.common.util.io.resource.AbstractIoResource;
4041
import org.apache.sshd.common.util.io.resource.IoResource;
4142
import org.apache.sshd.common.util.net.SshdSocketAddress;
4243
import org.apache.sshd.common.util.security.SecurityUtils;
4344
import org.apache.sshd.sftp.client.SftpClient;
44-
import org.apache.sshd.sftp.client.SftpClientFactory;
45+
import org.apache.sshd.sftp.client.SftpErrorDataHandler;
4546
import org.apache.sshd.sftp.client.SftpVersionSelector;
47+
import org.apache.sshd.sftp.client.impl.DefaultSftpClient;
4648

4749
import org.springframework.core.io.Resource;
4850
import org.springframework.integration.file.remote.session.SessionFactory;
@@ -278,8 +280,8 @@ public SftpSession getSession() {
278280
boolean freshSftpClient = false;
279281
if (sftpClient == null || !sftpClient.isOpen()) {
280282
sftpClient =
281-
SftpClientFactory.instance()
282-
.createSftpClient(initClientSession(), this.sftpVersionSelector);
283+
new ConcurrentSftpClient(initClientSession(), this.sftpVersionSelector,
284+
SftpErrorDataHandler.EMPTY);
283285
freshSftpClient = true;
284286
}
285287
sftpSession = new SftpSession(sftpClient);
@@ -388,4 +390,31 @@ public void resetSharedSession() {
388390
this.sharedSftpClient = null;
389391
}
390392

393+
/**
394+
* The {@link DefaultSftpClient} extension to lock the {@link #send(int, Buffer)}
395+
* for concurrent interaction.
396+
*/
397+
private static class ConcurrentSftpClient extends DefaultSftpClient {
398+
399+
private final Lock sendLock = new ReentrantLock();
400+
401+
ConcurrentSftpClient(ClientSession clientSession, SftpVersionSelector initialVersionSelector,
402+
SftpErrorDataHandler errorDataHandler) throws IOException {
403+
404+
super(clientSession, initialVersionSelector, errorDataHandler);
405+
}
406+
407+
@Override
408+
public int send(int cmd, Buffer buffer) throws IOException {
409+
this.sendLock.lock();
410+
try {
411+
return super.send(cmd, buffer);
412+
}
413+
finally {
414+
this.sendLock.unlock();
415+
}
416+
}
417+
418+
}
419+
391420
}

spring-integration-sftp/src/main/java/org/springframework/integration/sftp/session/SftpSession.java

+10-15
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -46,7 +46,6 @@
4646
* @author Oleg Zhurakousky
4747
* @author Gary Russell
4848
* @author Artem Bilan
49-
*
5049
* @since 2.0
5150
*/
5251
public class SftpSession implements Session<SftpClient.DirEntry> {
@@ -97,7 +96,7 @@ public Stream<SftpClient.DirEntry> doList(String path) throws IOException {
9796
}
9897
}
9998
remoteDir =
100-
remoteDir.length() > 0 && remoteDir.charAt(0) == '/'
99+
!remoteDir.isEmpty() && remoteDir.charAt(0) == '/'
101100
? remoteDir
102101
: this.sftpClient.canonicalPath(remoteDir);
103102
return StreamSupport.stream(this.sftpClient.readDir(remoteDir).spliterator(), false)
@@ -122,22 +121,18 @@ public boolean finalizeRaw() {
122121

123122
@Override
124123
public void write(InputStream inputStream, String destination) throws IOException {
125-
synchronized (this.sftpClient) {
126-
OutputStream outputStream = this.sftpClient.write(destination);
127-
FileCopyUtils.copy(inputStream, outputStream);
128-
}
124+
OutputStream outputStream = this.sftpClient.write(destination);
125+
FileCopyUtils.copy(inputStream, outputStream);
129126
}
130127

131128
@Override
132129
public void append(InputStream inputStream, String destination) throws IOException {
133-
synchronized (this.sftpClient) {
134-
OutputStream outputStream =
135-
this.sftpClient.write(destination,
136-
SftpClient.OpenMode.Create,
137-
SftpClient.OpenMode.Write,
138-
SftpClient.OpenMode.Append);
139-
FileCopyUtils.copy(inputStream, outputStream);
140-
}
130+
OutputStream outputStream =
131+
this.sftpClient.write(destination,
132+
SftpClient.OpenMode.Create,
133+
SftpClient.OpenMode.Write,
134+
SftpClient.OpenMode.Append);
135+
FileCopyUtils.copy(inputStream, outputStream);
141136
}
142137

143138
@Override

spring-integration-sftp/src/test/java/org/springframework/integration/sftp/session/SftpSessionFactoryTests.java

+39-1
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,21 @@
1818

1919
import java.io.File;
2020
import java.io.IOException;
21+
import java.io.UncheckedIOException;
2122
import java.net.ConnectException;
2223
import java.time.Duration;
2324
import java.util.ArrayList;
2425
import java.util.Collections;
2526
import java.util.List;
27+
import java.util.stream.IntStream;
2628

2729
import org.apache.sshd.client.SshClient;
2830
import org.apache.sshd.client.auth.password.PasswordIdentityProvider;
2931
import org.apache.sshd.client.keyverifier.AcceptAllServerKeyVerifier;
3032
import org.apache.sshd.common.SshException;
3133
import org.apache.sshd.server.SshServer;
3234
import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider;
35+
import org.apache.sshd.sftp.client.SftpClient;
3336
import org.apache.sshd.sftp.server.SftpSubsystemFactory;
3437
import org.junit.jupiter.api.Test;
3538

@@ -45,7 +48,6 @@
4548
* @author Gary Russell
4649
* @author Artem Bilan
4750
* @author Auke Zaaiman
48-
*
4951
* @since 3.0.2
5052
*/
5153
public class SftpSessionFactoryTests {
@@ -154,4 +156,40 @@ void externallyProvidedSshClientShouldNotHaveItsConfigurationOverwritten() throw
154156
}
155157
}
156158

159+
@Test
160+
void concurrentSessionListDoesntCauseFailure() throws IOException {
161+
try (SshServer server = SshServer.setUpDefaultServer()) {
162+
server.setPasswordAuthenticator((arg0, arg1, arg2) -> true);
163+
server.setPort(0);
164+
server.setKeyPairProvider(new SimpleGeneratorHostKeyProvider(new File("hostkey.ser").toPath()));
165+
server.setSubsystemFactories(Collections.singletonList(new SftpSubsystemFactory()));
166+
server.start();
167+
168+
DefaultSftpSessionFactory sftpSessionFactory = new DefaultSftpSessionFactory();
169+
sftpSessionFactory.setHost("localhost");
170+
sftpSessionFactory.setPort(server.getPort());
171+
sftpSessionFactory.setUser("user");
172+
sftpSessionFactory.setPassword("pass");
173+
sftpSessionFactory.setAllowUnknownKeys(true);
174+
175+
SftpSession session = sftpSessionFactory.getSession();
176+
177+
List<SftpClient.DirEntry[]> dirEntries =
178+
IntStream.range(0, 10)
179+
.boxed()
180+
.parallel()
181+
.map(i -> {
182+
try {
183+
return session.list(".");
184+
}
185+
catch (IOException e) {
186+
throw new UncheckedIOException(e);
187+
}
188+
})
189+
.toList();
190+
191+
assertThat(dirEntries).hasSize(10);
192+
}
193+
}
194+
157195
}

0 commit comments

Comments
 (0)