From d04002bc9c054835053d8fdeb24966f84543bfc5 Mon Sep 17 00:00:00 2001 From: abilan Date: Tue, 28 Feb 2023 15:36:48 -0500 Subject: [PATCH 1/2] GH-8562: Fix streaming source for remote calls Fixes https://github.com/spring-projects/spring-integration/issues/8562 The `AbstractRemoteFileStreamingMessageSource.doReceive()` takes files first from a `toBeReceived` queue. When `AbstractRemoteFileStreamingMessageSource.remoteFileToMessage()` fails to fetch the file content because of interim connection issue, we reset this file from a filter and rethrow an exception. The next `receive()` call will just go ahead to the next entry in the `toBeReceived` queue, but the file we have just failed for will be retried only on the next list call to the remove directory. This essentially breaks a possible in-order target application logic. * Introduce `AbstractRemoteFileStreamingMessageSource.strictOrder` option to clear the `toBeReceived` queue when we fail in the `remoteFileToMessage()`, so the next `receive()` call would re-fetch files from remote dir, because the filter has been reset for those files. * Fix `AbstractFileInfo.toString()` to not perform remote calls when we just log this file. For example, we reset the file for connection failure and log the message about it, but it fails again because we request `size` of the file which may require a remote connection. **Cherry-pick to `6.0.x` & `5.5.x`** --- .../file/remote/AbstractFileInfo.java | 12 +++--- ...tractRemoteFileStreamingMessageSource.java | 40 +++++++++++++++---- .../file/remote/StreamingInboundTests.java | 6 ++- 3 files changed, 43 insertions(+), 15 deletions(-) diff --git a/spring-integration-file/src/main/java/org/springframework/integration/file/remote/AbstractFileInfo.java b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/AbstractFileInfo.java index fa3f11c4bba..1687a55328e 100644 --- a/spring-integration-file/src/main/java/org/springframework/integration/file/remote/AbstractFileInfo.java +++ b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/AbstractFileInfo.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,8 +16,6 @@ package org.springframework.integration.file.remote; -import java.util.Date; - import org.springframework.integration.json.SimpleJsonSerializer; /** @@ -27,6 +25,7 @@ * @param The target protocol file type. * * @author Gary Russell + * @author Artem Bilan * * @since 2.1 */ @@ -57,10 +56,9 @@ public String toJson() { @Override public String toString() { - return "FileInfo [isDirectory=" + isDirectory() + ", isLink=" + isLink() - + ", Size=" + getSize() + ", ModifiedTime=" - + new Date(getModified()) + ", Filename=" + getFilename() - + ", RemoteDirectory=" + getRemoteDirectory() + ", Permissions=" + getPermissions() + "]"; + return "FileInfo [isLink=" + isLink() + + ", Filename=" + getFilename() + + ", RemoteDirectory=" + getRemoteDirectory() + "]"; } } diff --git a/spring-integration-file/src/main/java/org/springframework/integration/file/remote/AbstractRemoteFileStreamingMessageSource.java b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/AbstractRemoteFileStreamingMessageSource.java index 10c5a887794..d267a2352dc 100644 --- a/spring-integration-file/src/main/java/org/springframework/integration/file/remote/AbstractRemoteFileStreamingMessageSource.java +++ b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/AbstractRemoteFileStreamingMessageSource.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2022 the original author or authors. + * Copyright 2016-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -86,6 +86,8 @@ public abstract class AbstractRemoteFileStreamingMessageSource */ private FileListFilter filter; + private boolean strictOrder; + protected AbstractRemoteFileStreamingMessageSource(RemoteFileTemplate template, @Nullable Comparator comparator) { @@ -133,7 +135,8 @@ protected final void doSetFilter(FileListFilter filterToSet) { } /** - * Set to false to add the {@link FileHeaders#REMOTE_FILE_INFO} header to the raw {@link FileInfo}. + * Set to {@code false} to add the {@link FileHeaders#REMOTE_FILE_INFO} + * header to the raw {@link FileInfo}. * Default is true meaning that common file information properties are provided * in that header as JSON. * @param fileInfoJson false to set the raw object. @@ -143,6 +146,18 @@ public void setFileInfoJson(boolean fileInfoJson) { this.fileInfoJson = fileInfoJson; } + /** + * The flag indicating if the local cache has to be fully clear on failure + * to preserve a processing order of remote files on the next {@link #receive()} attempt. + * By default, only the failed file will be re-fetched from remote directory, + * but only when local cache is already empty, essential out of order. + * @param strictOrder if cached files has to be cleared on failure. + * @since 5.5.17 + */ + public void setStrictOrder(boolean strictOrder) { + this.strictOrder = strictOrder; + } + protected RemoteFileTemplate getRemoteFileTemplate() { return this.remoteFileTemplate; } @@ -235,9 +250,19 @@ private Object remoteFileToMessage(AbstractFileInfo file) { throw new UncheckedIOException("IOException when retrieving " + remotePath, e); } } - catch (RuntimeException e) { - resetFilterIfNecessary(file); - throw e; + catch (RuntimeException ex) { + if (this.strictOrder) { + // If we could not fetch the file content, then it is fatal. + // Clear local queue to be refreshed on the next 'receive()' call. + List> filesToReset = new ArrayList<>(); + filesToReset.add(file); + this.toBeReceived.drainTo(filesToReset); + filesToReset.forEach(this::resetFilterIfNecessary); + } + else { + resetFilterIfNecessary(file); + } + throw ex; } } @@ -250,8 +275,9 @@ protected AbstractFileInfo poll() { private void resetFilterIfNecessary(AbstractFileInfo file) { if (this.filter instanceof ResettableFileListFilter) { - this.logger.info(LogMessage.format("Removing the remote file '%s' from" - + "the filterfor a subsequent transfer attempt", file)); + this.logger.info( + LogMessage.format("Removing the remote file '%s' from the filter for a subsequent transfer attempt", + file.getFilename())); ((ResettableFileListFilter) this.filter).remove(file.getFileInfo()); } } diff --git a/spring-integration-file/src/test/java/org/springframework/integration/file/remote/StreamingInboundTests.java b/spring-integration-file/src/test/java/org/springframework/integration/file/remote/StreamingInboundTests.java index 43b4670405c..ca6795fde36 100644 --- a/spring-integration-file/src/test/java/org/springframework/integration/file/remote/StreamingInboundTests.java +++ b/spring-integration-file/src/test/java/org/springframework/integration/file/remote/StreamingInboundTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2022 the original author or authors. + * Copyright 2016-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -243,6 +243,10 @@ public void testFilterReversedOnBadFetch() { .isThrownBy(streamer::receive); assertThat(TestUtils.getPropertyValue(streamer, "toBeReceived", BlockingQueue.class)).hasSize(1); assertThat(streamer.metadataMap).hasSize(0); + streamer.setStrictOrder(true); + assertThatExceptionOfType(UncheckedIOException.class) + .isThrownBy(streamer::receive); + assertThat(TestUtils.getPropertyValue(streamer, "toBeReceived", BlockingQueue.class)).hasSize(0); } public static class Streamer extends AbstractRemoteFileStreamingMessageSource { From 331d53cf45c88d7adedcae57aa8834a30fe0a1ad Mon Sep 17 00:00:00 2001 From: abilan Date: Tue, 28 Feb 2023 16:12:34 -0500 Subject: [PATCH 2/2] * Revert `AbstractFileInfo` changes * Override `toString()` in `SmbFileInfo` instead - exactly the place where connection is used to obtain file attributes like `size` or `lastModified` --- .../file/remote/AbstractFileInfo.java | 12 +++++++----- .../integration/smb/session/SmbFileInfo.java | 18 +++++++++++------- 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/spring-integration-file/src/main/java/org/springframework/integration/file/remote/AbstractFileInfo.java b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/AbstractFileInfo.java index 1687a55328e..fa3f11c4bba 100644 --- a/spring-integration-file/src/main/java/org/springframework/integration/file/remote/AbstractFileInfo.java +++ b/spring-integration-file/src/main/java/org/springframework/integration/file/remote/AbstractFileInfo.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2023 the original author or authors. + * Copyright 2002-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,8 @@ package org.springframework.integration.file.remote; +import java.util.Date; + import org.springframework.integration.json.SimpleJsonSerializer; /** @@ -25,7 +27,6 @@ * @param The target protocol file type. * * @author Gary Russell - * @author Artem Bilan * * @since 2.1 */ @@ -56,9 +57,10 @@ public String toJson() { @Override public String toString() { - return "FileInfo [isLink=" + isLink() - + ", Filename=" + getFilename() - + ", RemoteDirectory=" + getRemoteDirectory() + "]"; + return "FileInfo [isDirectory=" + isDirectory() + ", isLink=" + isLink() + + ", Size=" + getSize() + ", ModifiedTime=" + + new Date(getModified()) + ", Filename=" + getFilename() + + ", RemoteDirectory=" + getRemoteDirectory() + ", Permissions=" + getPermissions() + "]"; } } diff --git a/spring-integration-smb/src/main/java/org/springframework/integration/smb/session/SmbFileInfo.java b/spring-integration-smb/src/main/java/org/springframework/integration/smb/session/SmbFileInfo.java index 405faa06cbe..52b9710f41f 100644 --- a/spring-integration-smb/src/main/java/org/springframework/integration/smb/session/SmbFileInfo.java +++ b/spring-integration-smb/src/main/java/org/springframework/integration/smb/session/SmbFileInfo.java @@ -1,5 +1,5 @@ /* - * Copyright 2022 the original author or authors. + * Copyright 2022-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,8 +28,7 @@ import org.springframework.util.Assert; /** - * A {@link org.springframework.integration.file.remote.FileInfo} implementation for - * SMB. + * An {@link AbstractFileInfo} implementation for SMB protocol. * * @author Gregory Bragg * @author Artem Bilan @@ -156,13 +155,18 @@ public String getPermissions() { return sb.toString(); } - private static String aceToAllowFlag(ACE ace) { - return ace.isAllow() ? "Allow " : "Deny "; - } - @Override public SmbFile getFileInfo() { return this.smbFile; } + @Override + public String toString() { + return "SmbFileInfo{smbFile=" + this.smbFile + '}'; + } + + private static String aceToAllowFlag(ACE ace) { + return ace.isAllow() ? "Allow " : "Deny "; + } + }