Skip to content

Commit bb2c947

Browse files
kkosiackiEddieChoCho
authored andcommitted
spring-projectsGH-8898: Add AbstractRemoteFileStreamingMessageSource.clearFetchedCache
Fixes: spring-projects#8898 In some cases not all retched remote files are processed, and after changing the `SessionFactory` (e.g. `RotatingServerAdvice`) thy might not be processed on the next polling cycle
1 parent 0ff037b commit bb2c947

File tree

5 files changed

+72
-26
lines changed

5 files changed

+72
-26
lines changed

spring-integration-file/src/main/java/org/springframework/integration/file/remote/AbstractRemoteFileStreamingMessageSource.java

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -183,16 +183,27 @@ public void start() {
183183
@Override
184184
public void stop() {
185185
if (this.running.compareAndSet(true, false)) {
186-
if (this.filter == null || this.filter.supportsSingleFileFiltering()) {
187-
this.toBeReceived.clear();
188-
}
189-
else {
190-
// remove unprocessed files from the queue (and filter)
191-
AbstractFileInfo<F> file = this.toBeReceived.poll();
192-
while (file != null) {
193-
resetFilterIfNecessary(file);
194-
file = this.toBeReceived.poll();
195-
}
186+
clearFetchedCache();
187+
}
188+
}
189+
190+
/**
191+
* Clear internal queue of fetched remote files.
192+
* This functionality might be useful in combination with a
193+
* {@link org.springframework.integration.file.remote.aop.RotatingServerAdvice},
194+
* when not all fetched files are processed in between rotations.
195+
* @since 6.4
196+
*/
197+
public void clearFetchedCache() {
198+
if (this.filter == null || this.filter.supportsSingleFileFiltering()) {
199+
this.toBeReceived.clear();
200+
}
201+
else {
202+
// remove unprocessed files from the queue (and filter)
203+
AbstractFileInfo<F> file = this.toBeReceived.poll();
204+
while (file != null) {
205+
resetFilterIfNecessary(file);
206+
file = this.toBeReceived.poll();
196207
}
197208
}
198209
}

spring-integration-file/src/test/java/org/springframework/integration/file/remote/RemoteFileStreamingMessageSourceTests.java

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2022 the original author or authors.
2+
* Copyright 2015-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,7 +17,6 @@
1717
package org.springframework.integration.file.remote;
1818

1919
import java.io.IOException;
20-
import java.io.InputStream;
2120
import java.io.UncheckedIOException;
2221
import java.util.Collection;
2322
import java.util.Comparator;
@@ -26,7 +25,6 @@
2625

2726
import org.junit.jupiter.api.Test;
2827

29-
import org.springframework.beans.factory.BeanFactory;
3028
import org.springframework.integration.file.filters.FileListFilter;
3129
import org.springframework.integration.file.remote.session.CachingSessionFactory;
3230
import org.springframework.integration.file.remote.session.Session;
@@ -37,6 +35,8 @@
3735
import static org.mockito.ArgumentMatchers.anyInt;
3836
import static org.mockito.ArgumentMatchers.anyString;
3937
import static org.mockito.Mockito.mock;
38+
import static org.mockito.Mockito.times;
39+
import static org.mockito.Mockito.verify;
4040
import static org.mockito.Mockito.when;
4141

4242
/**
@@ -49,39 +49,65 @@
4949
public class RemoteFileStreamingMessageSourceTests {
5050

5151
@Test
52-
@SuppressWarnings("unchecked")
52+
public void fetchFilesFromRemoteAfterClearingFetchedCache() throws IOException {
53+
RemoteFileTemplate<String> remoteFileTemplate = mock();
54+
when(remoteFileTemplate.list("remoteDirectory")).thenReturn(new String[] {"file1", "file2"});
55+
Session<String> session = mock();
56+
when(session.readRaw(anyString())).thenReturn(mock());
57+
when(remoteFileTemplate.getSession()).thenReturn(session);
58+
59+
Comparator<String> comparator = mock();
60+
TestRemoteFileStreamingMessageSource testRemoteFileStreamingMessageSource =
61+
new TestRemoteFileStreamingMessageSource(remoteFileTemplate, comparator);
62+
63+
testRemoteFileStreamingMessageSource.setRemoteDirectory("remoteDirectory");
64+
testRemoteFileStreamingMessageSource.setBeanFactory(mock());
65+
testRemoteFileStreamingMessageSource.start();
66+
67+
assertThat(testRemoteFileStreamingMessageSource.doReceive(2))
68+
.isNotNull();
69+
70+
testRemoteFileStreamingMessageSource.clearFetchedCache();
71+
72+
assertThat(testRemoteFileStreamingMessageSource.doReceive(2))
73+
.isNotNull();
74+
75+
verify(remoteFileTemplate, times(2)).list("remoteDirectory");
76+
77+
}
78+
79+
@Test
5380
public void filterOutFilesNotAcceptedByFilter() throws IOException {
54-
RemoteFileTemplate<String> remoteFileTemplate = mock(RemoteFileTemplate.class);
81+
RemoteFileTemplate<String> remoteFileTemplate = mock();
5582
when(remoteFileTemplate.list("remoteDirectory")).thenReturn(new String[] {"file1", "file2"});
56-
Session<String> session = mock(Session.class);
57-
when(session.readRaw(anyString())).thenReturn(mock(InputStream.class));
83+
Session<String> session = mock();
84+
when(session.readRaw(anyString())).thenReturn(mock());
5885
when(remoteFileTemplate.getSession()).thenReturn(session);
5986

60-
FileListFilter<String> fileListFilter = mock(FileListFilter.class);
87+
FileListFilter<String> fileListFilter = mock();
6188
when(fileListFilter.supportsSingleFileFiltering()).thenReturn(true);
6289
when(fileListFilter.accept("file1")).thenReturn(false);
6390
when(fileListFilter.accept("file2")).thenReturn(false);
6491

65-
Comparator<String> comparator = mock(Comparator.class);
92+
Comparator<String> comparator = mock();
6693
TestRemoteFileStreamingMessageSource testRemoteFileStreamingMessageSource =
6794
new TestRemoteFileStreamingMessageSource(remoteFileTemplate, comparator);
6895

6996
testRemoteFileStreamingMessageSource.setFilter(fileListFilter);
7097
testRemoteFileStreamingMessageSource.setRemoteDirectory("remoteDirectory");
71-
testRemoteFileStreamingMessageSource.setBeanFactory(mock(BeanFactory.class));
98+
testRemoteFileStreamingMessageSource.setBeanFactory(mock());
7299
testRemoteFileStreamingMessageSource.start();
73100

74101
assertThat(testRemoteFileStreamingMessageSource.doReceive(-1)).isNull();
75102
}
76103

77104
@Test
78-
@SuppressWarnings("unchecked")
79105
public void sessionReturnedToCacheProperlyOnDoReceive() throws IOException {
80-
Session<String> session = mock(Session.class);
106+
Session<String> session = mock();
81107
when(session.readRaw(anyString())).thenThrow(IOException.class);
82108
when(session.list("remoteDirectory")).thenReturn(new String[] {"file1"});
83109

84-
SessionFactory<String> sessionFactory = mock(SessionFactory.class);
110+
SessionFactory<String> sessionFactory = mock();
85111
when(sessionFactory.getSession()).thenReturn(session);
86112

87113
CachingSessionFactory<String> cachingSessionFactory = new CachingSessionFactory<>(sessionFactory, 1);
@@ -91,7 +117,7 @@ public void sessionReturnedToCacheProperlyOnDoReceive() throws IOException {
91117
new TestRemoteFileStreamingMessageSource(remoteFileTemplate, null);
92118

93119
testRemoteFileStreamingMessageSource.setRemoteDirectory("remoteDirectory");
94-
testRemoteFileStreamingMessageSource.setBeanFactory(mock(BeanFactory.class));
120+
testRemoteFileStreamingMessageSource.setBeanFactory(mock());
95121
testRemoteFileStreamingMessageSource.start();
96122

97123
assertThatExceptionOfType(UncheckedIOException.class)

src/reference/antora/modules/ROOT/pages/sftp/max-fetch.adoc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,7 @@ If the poller is active when the property is changed, the change takes effect on
2525
Starting with version 5.1, the synchronizer can be provided with a `Comparator<?>`.
2626
This is useful when restricting the number of files fetched with `maxFetchSize`.
2727

28-
Also see general xref:sftp/inbound.adoc[SFTP Inbound Channel Adapter] chapter for information about `FileListFilter` configuration.
28+
Starting with version 6.4, the `AbstractRemoteFileStreamingMessageSource` has now a convenient `clearFetchedCache()` API to remove references from cache for not processed remote files.
29+
The references stay in cache because polling configuration does not allow to process all of them in one cycle, and the target `SessionFactory` might be changed between polling cycles, e.g. via `RotatingServerAdvice`.
2930

31+
Also see general xref:sftp/inbound.adoc[SFTP Inbound Channel Adapter] chapter for information about `FileListFilter` configuration.

src/reference/antora/modules/ROOT/pages/sftp/rotating-server-advice.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,3 +83,4 @@ public IntegrationFlow flow() {
8383

8484
IMPORTANT: Do not configure a `TaskExecutor` on the poller when using this advice; see xref:changes-4.1-4.2.adoc#x4.2-conditional-pollers[Conditional Pollers for Message Sources] for more information.
8585

86+
Also see a convenient `AbstractRemoteFileStreamingMessageSource.clearFetchedCache()` API when not all fetched files are processed withing a single polling cycle, but `SessionFactory` might be rotated to different one.

src/reference/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,10 @@ In general the project has been moved to the latest dependency versions.
1717
=== New Components
1818

1919
[[x6.4-general]]
20-
=== General Changes
20+
=== General Changes
21+
22+
[[x6.4-remote-files-changes]]
23+
=== Remote File Adapters Changes
24+
25+
The `AbstractRemoteFileStreamingMessageSource` has now a convenient `clearFetchedCache()` API to remove references from cache for not processed remote files.
26+
The references stay in cache because polling configuration does not allow to process all the fetched in one cycle, and the target `SessionFactory` might be changed between polling cycles, e.g. via `RotatingServerAdvice`.

0 commit comments

Comments
 (0)