diff --git a/spring-integration-sftp/src/main/java/org/springframework/integration/sftp/session/DefaultSftpSessionFactory.java b/spring-integration-sftp/src/main/java/org/springframework/integration/sftp/session/DefaultSftpSessionFactory.java index 782223a87e8..21bcf80ba1a 100644 --- a/spring-integration-sftp/src/main/java/org/springframework/integration/sftp/session/DefaultSftpSessionFactory.java +++ b/spring-integration-sftp/src/main/java/org/springframework/integration/sftp/session/DefaultSftpSessionFactory.java @@ -36,13 +36,15 @@ import org.apache.sshd.common.SshConstants; import org.apache.sshd.common.config.keys.FilePasswordProvider; import org.apache.sshd.common.keyprovider.KeyIdentityProvider; +import org.apache.sshd.common.util.buffer.Buffer; import org.apache.sshd.common.util.io.resource.AbstractIoResource; import org.apache.sshd.common.util.io.resource.IoResource; import org.apache.sshd.common.util.net.SshdSocketAddress; import org.apache.sshd.common.util.security.SecurityUtils; import org.apache.sshd.sftp.client.SftpClient; -import org.apache.sshd.sftp.client.SftpClientFactory; +import org.apache.sshd.sftp.client.SftpErrorDataHandler; import org.apache.sshd.sftp.client.SftpVersionSelector; +import org.apache.sshd.sftp.client.impl.DefaultSftpClient; import org.springframework.core.io.Resource; import org.springframework.integration.file.remote.session.SessionFactory; @@ -281,8 +283,8 @@ public SftpSession getSession() { boolean freshSftpClient = false; if (sftpClient == null || !sftpClient.isOpen()) { sftpClient = - SftpClientFactory.instance() - .createSftpClient(initClientSession(), this.sftpVersionSelector); + new ConcurrentSftpClient(initClientSession(), this.sftpVersionSelector, + SftpErrorDataHandler.EMPTY); freshSftpClient = true; } sftpSession = new SftpSession(sftpClient); @@ -395,4 +397,31 @@ public void resetSharedSession() { this.sharedSftpClient = null; } + /** + * The {@link DefaultSftpClient} extension to lock the {@link #send(int, Buffer)} + * for concurrent interaction. + */ + private static class ConcurrentSftpClient extends DefaultSftpClient { + + private final Lock sendLock = new ReentrantLock(); + + ConcurrentSftpClient(ClientSession clientSession, SftpVersionSelector initialVersionSelector, + SftpErrorDataHandler errorDataHandler) throws IOException { + + super(clientSession, initialVersionSelector, errorDataHandler); + } + + @Override + public int send(int cmd, Buffer buffer) throws IOException { + this.sendLock.lock(); + try { + return super.send(cmd, buffer); + } + finally { + this.sendLock.unlock(); + } + } + + } + } diff --git a/spring-integration-sftp/src/main/java/org/springframework/integration/sftp/session/SftpSession.java b/spring-integration-sftp/src/main/java/org/springframework/integration/sftp/session/SftpSession.java index ddbeae10d36..80883331af7 100644 --- a/spring-integration-sftp/src/main/java/org/springframework/integration/sftp/session/SftpSession.java +++ b/spring-integration-sftp/src/main/java/org/springframework/integration/sftp/session/SftpSession.java @@ -22,8 +22,6 @@ import java.io.UncheckedIOException; import java.net.SocketAddress; import java.time.Duration; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -49,13 +47,10 @@ * @author Gary Russell * @author Artem Bilan * @author Christian Tzolov - * * @since 2.0 */ public class SftpSession implements Session { - private final Lock lock = new ReentrantLock(); - private final SftpClient sftpClient; public SftpSession(SftpClient sftpClient) { @@ -113,7 +108,7 @@ public Stream doList(String path) throws IOException { } } remoteDir = - remoteDir.length() > 0 && remoteDir.charAt(0) == '/' + !remoteDir.isEmpty() && remoteDir.charAt(0) == '/' ? remoteDir : this.sftpClient.canonicalPath(remoteDir); return StreamSupport.stream(this.sftpClient.readDir(remoteDir).spliterator(), false) @@ -138,30 +133,18 @@ public boolean finalizeRaw() { @Override public void write(InputStream inputStream, String destination) throws IOException { - this.lock.lock(); - try { - OutputStream outputStream = this.sftpClient.write(destination); - FileCopyUtils.copy(inputStream, outputStream); - } - finally { - this.lock.unlock(); - } + OutputStream outputStream = this.sftpClient.write(destination); + FileCopyUtils.copy(inputStream, outputStream); } @Override public void append(InputStream inputStream, String destination) throws IOException { - this.lock.lock(); - try { - OutputStream outputStream = - this.sftpClient.write(destination, - SftpClient.OpenMode.Create, - SftpClient.OpenMode.Write, - SftpClient.OpenMode.Append); - FileCopyUtils.copy(inputStream, outputStream); - } - finally { - this.lock.unlock(); - } + OutputStream outputStream = + this.sftpClient.write(destination, + SftpClient.OpenMode.Create, + SftpClient.OpenMode.Write, + SftpClient.OpenMode.Append); + FileCopyUtils.copy(inputStream, outputStream); } @Override diff --git a/spring-integration-sftp/src/test/java/org/springframework/integration/sftp/session/SftpSessionFactoryTests.java b/spring-integration-sftp/src/test/java/org/springframework/integration/sftp/session/SftpSessionFactoryTests.java index 187beee44bd..80578d29663 100644 --- a/spring-integration-sftp/src/test/java/org/springframework/integration/sftp/session/SftpSessionFactoryTests.java +++ b/spring-integration-sftp/src/test/java/org/springframework/integration/sftp/session/SftpSessionFactoryTests.java @@ -18,11 +18,13 @@ import java.io.File; import java.io.IOException; +import java.io.UncheckedIOException; import java.net.ConnectException; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.stream.IntStream; import org.apache.sshd.client.SshClient; import org.apache.sshd.client.auth.password.PasswordIdentityProvider; @@ -30,6 +32,7 @@ import org.apache.sshd.common.SshException; import org.apache.sshd.server.SshServer; import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider; +import org.apache.sshd.sftp.client.SftpClient; import org.apache.sshd.sftp.server.SftpSubsystemFactory; import org.junit.jupiter.api.Test; @@ -45,7 +48,6 @@ * @author Gary Russell * @author Artem Bilan * @author Auke Zaaiman - * * @since 3.0.2 */ public class SftpSessionFactoryTests { @@ -154,4 +156,40 @@ void externallyProvidedSshClientShouldNotHaveItsConfigurationOverwritten() throw } } + @Test + void concurrentSessionListDoesntCauseFailure() throws IOException { + try (SshServer server = SshServer.setUpDefaultServer()) { + server.setPasswordAuthenticator((arg0, arg1, arg2) -> true); + server.setPort(0); + server.setKeyPairProvider(new SimpleGeneratorHostKeyProvider(new File("hostkey.ser").toPath())); + server.setSubsystemFactories(Collections.singletonList(new SftpSubsystemFactory())); + server.start(); + + DefaultSftpSessionFactory sftpSessionFactory = new DefaultSftpSessionFactory(); + sftpSessionFactory.setHost("localhost"); + sftpSessionFactory.setPort(server.getPort()); + sftpSessionFactory.setUser("user"); + sftpSessionFactory.setPassword("pass"); + sftpSessionFactory.setAllowUnknownKeys(true); + + SftpSession session = sftpSessionFactory.getSession(); + + List dirEntries = + IntStream.range(0, 10) + .boxed() + .parallel() + .map(i -> { + try { + return session.list("."); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + }) + .toList(); + + assertThat(dirEntries).hasSize(10); + } + } + }