diff --git a/spring-integration-file/src/main/java/org/springframework/integration/file/remote/AbstractRemoteFileStreamingMessageSource.java b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/AbstractRemoteFileStreamingMessageSource.java index 10c5a887794..d267a2352dc 100644 --- a/spring-integration-file/src/main/java/org/springframework/integration/file/remote/AbstractRemoteFileStreamingMessageSource.java +++ b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/AbstractRemoteFileStreamingMessageSource.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2022 the original author or authors. + * Copyright 2016-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -86,6 +86,8 @@ public abstract class AbstractRemoteFileStreamingMessageSource */ private FileListFilter filter; + private boolean strictOrder; + protected AbstractRemoteFileStreamingMessageSource(RemoteFileTemplate template, @Nullable Comparator comparator) { @@ -133,7 +135,8 @@ protected final void doSetFilter(FileListFilter filterToSet) { } /** - * Set to false to add the {@link FileHeaders#REMOTE_FILE_INFO} header to the raw {@link FileInfo}. + * Set to {@code false} to add the {@link FileHeaders#REMOTE_FILE_INFO} + * header to the raw {@link FileInfo}. * Default is true meaning that common file information properties are provided * in that header as JSON. * @param fileInfoJson false to set the raw object. @@ -143,6 +146,18 @@ public void setFileInfoJson(boolean fileInfoJson) { this.fileInfoJson = fileInfoJson; } + /** + * The flag indicating if the local cache has to be fully clear on failure + * to preserve a processing order of remote files on the next {@link #receive()} attempt. + * By default, only the failed file will be re-fetched from remote directory, + * but only when local cache is already empty, essential out of order. + * @param strictOrder if cached files has to be cleared on failure. + * @since 5.5.17 + */ + public void setStrictOrder(boolean strictOrder) { + this.strictOrder = strictOrder; + } + protected RemoteFileTemplate getRemoteFileTemplate() { return this.remoteFileTemplate; } @@ -235,9 +250,19 @@ private Object remoteFileToMessage(AbstractFileInfo file) { throw new UncheckedIOException("IOException when retrieving " + remotePath, e); } } - catch (RuntimeException e) { - resetFilterIfNecessary(file); - throw e; + catch (RuntimeException ex) { + if (this.strictOrder) { + // If we could not fetch the file content, then it is fatal. + // Clear local queue to be refreshed on the next 'receive()' call. + List> filesToReset = new ArrayList<>(); + filesToReset.add(file); + this.toBeReceived.drainTo(filesToReset); + filesToReset.forEach(this::resetFilterIfNecessary); + } + else { + resetFilterIfNecessary(file); + } + throw ex; } } @@ -250,8 +275,9 @@ protected AbstractFileInfo poll() { private void resetFilterIfNecessary(AbstractFileInfo file) { if (this.filter instanceof ResettableFileListFilter) { - this.logger.info(LogMessage.format("Removing the remote file '%s' from" - + "the filterfor a subsequent transfer attempt", file)); + this.logger.info( + LogMessage.format("Removing the remote file '%s' from the filter for a subsequent transfer attempt", + file.getFilename())); ((ResettableFileListFilter) this.filter).remove(file.getFileInfo()); } } diff --git a/spring-integration-file/src/test/java/org/springframework/integration/file/remote/StreamingInboundTests.java b/spring-integration-file/src/test/java/org/springframework/integration/file/remote/StreamingInboundTests.java index 43b4670405c..ca6795fde36 100644 --- a/spring-integration-file/src/test/java/org/springframework/integration/file/remote/StreamingInboundTests.java +++ b/spring-integration-file/src/test/java/org/springframework/integration/file/remote/StreamingInboundTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2022 the original author or authors. + * Copyright 2016-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -243,6 +243,10 @@ public void testFilterReversedOnBadFetch() { .isThrownBy(streamer::receive); assertThat(TestUtils.getPropertyValue(streamer, "toBeReceived", BlockingQueue.class)).hasSize(1); assertThat(streamer.metadataMap).hasSize(0); + streamer.setStrictOrder(true); + assertThatExceptionOfType(UncheckedIOException.class) + .isThrownBy(streamer::receive); + assertThat(TestUtils.getPropertyValue(streamer, "toBeReceived", BlockingQueue.class)).hasSize(0); } public static class Streamer extends AbstractRemoteFileStreamingMessageSource { diff --git a/spring-integration-smb/src/main/java/org/springframework/integration/smb/session/SmbFileInfo.java b/spring-integration-smb/src/main/java/org/springframework/integration/smb/session/SmbFileInfo.java index 405faa06cbe..52b9710f41f 100644 --- a/spring-integration-smb/src/main/java/org/springframework/integration/smb/session/SmbFileInfo.java +++ b/spring-integration-smb/src/main/java/org/springframework/integration/smb/session/SmbFileInfo.java @@ -1,5 +1,5 @@ /* - * Copyright 2022 the original author or authors. + * Copyright 2022-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,8 +28,7 @@ import org.springframework.util.Assert; /** - * A {@link org.springframework.integration.file.remote.FileInfo} implementation for - * SMB. + * An {@link AbstractFileInfo} implementation for SMB protocol. * * @author Gregory Bragg * @author Artem Bilan @@ -156,13 +155,18 @@ public String getPermissions() { return sb.toString(); } - private static String aceToAllowFlag(ACE ace) { - return ace.isAllow() ? "Allow " : "Deny "; - } - @Override public SmbFile getFileInfo() { return this.smbFile; } + @Override + public String toString() { + return "SmbFileInfo{smbFile=" + this.smbFile + '}'; + } + + private static String aceToAllowFlag(ACE ace) { + return ace.isAllow() ? "Allow " : "Deny "; + } + }