Skip to content

Commit c78630e

Browse files
committed
GH-8967: maxFetchSize = 1 from the StandardRotationPolicy
Fixes: #8967 In the `fair` mode the `StandardRotationPolicy` re-configures an `AbstractFetchLimitingMessageSource` for a new directory (and possible new `ConnectionFactory`) in the `beforeReceive()`. However, with default `maxFetchSize` (or bigger than `1`), the `receive()`` would poll `toBeReceived` internal queue for files cached from the previous polling cycle. Since we rotate the source immediately to a new set of options, all those cached files don't make sense or even can cause the problem on fetching their content in case of `AbstractRemoteFileStreamingMessageSource` when we rotate to a new `ConnectionFactory`. * Call `fetchLimitingMessageSource.setMaxFetchSize(1);` in the `StandardRotationPolicy.beforeReceive()` when `fair && !this.initialized` **Auto-cherry-pick to `6.2.x` & `6.1.x`**
1 parent a5e9575 commit c78630e

File tree

2 files changed

+17
-10
lines changed

2 files changed

+17
-10
lines changed

spring-integration-file/src/main/java/org/springframework/integration/file/remote/aop/StandardRotationPolicy.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2019 the original author or authors.
2+
* Copyright 2018-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,7 +24,9 @@
2424
import org.apache.commons.logging.Log;
2525
import org.apache.commons.logging.LogFactory;
2626

27+
import org.springframework.core.log.LogMessage;
2728
import org.springframework.integration.core.MessageSource;
29+
import org.springframework.integration.endpoint.AbstractFetchLimitingMessageSource;
2830
import org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource;
2931
import org.springframework.integration.file.remote.session.DelegatingSessionFactory;
3032
import org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource;
@@ -33,9 +35,9 @@
3335
/**
3436
* Standard rotation policy; iterates over key/directory pairs; when the end is reached,
3537
* starts again at the beginning. If the fair option is true the rotation occurs on every
36-
* poll, regardless of result. Otherwise rotation occurs when the current pair returns no
38+
* poll, regardless of result. Otherwise, rotation occurs when the current pair returns no
3739
* message.
38-
*
40+
* <p>
3941
* Subclasses implement {@code onRotation(MessageSource<?> source)} to configure the
4042
* {@link MessageSource} on each rotation.
4143
*
@@ -78,6 +80,12 @@ public StandardRotationPolicy(DelegatingSessionFactory<?> factory, List<KeyDirec
7880
public void beforeReceive(MessageSource<?> source) {
7981
if (this.fair || !this.initialized) {
8082
configureSource(source);
83+
if (this.fair && !this.initialized
84+
&& source instanceof AbstractFetchLimitingMessageSource<?> fetchLimitingMessageSource) {
85+
86+
this.logger.info(LogMessage.format("Enforce 'maxFetchSize = 1' for '%s' in the 'fair' mode", source));
87+
fetchLimitingMessageSource.setMaxFetchSize(1);
88+
}
8189
this.initialized = true;
8290
}
8391
if (this.logger.isTraceEnabled()) {
@@ -142,11 +150,11 @@ protected void configureSource(MessageSource<?> source) {
142150
* @param source the MessageSource.
143151
*/
144152
protected void onRotation(MessageSource<?> source) {
145-
if (source instanceof AbstractRemoteFileStreamingMessageSource) {
146-
((AbstractRemoteFileStreamingMessageSource<?>) source).setRemoteDirectory(this.current.getDirectory());
153+
if (source instanceof AbstractRemoteFileStreamingMessageSource<?> streamingMessageSource) {
154+
streamingMessageSource.setRemoteDirectory(this.current.getDirectory());
147155
}
148-
else if (source instanceof AbstractInboundFileSynchronizingMessageSource) {
149-
((AbstractInboundFileSynchronizingMessageSource<?>) source).getSynchronizer()
156+
else if (source instanceof AbstractInboundFileSynchronizingMessageSource<?> synchronizingMessageSource) {
157+
synchronizingMessageSource.getSynchronizer()
150158
.setRemoteDirectory(this.current.getDirectory());
151159
}
152160
}

spring-integration-ftp/src/test/java/org/springframework/integration/ftp/inbound/RotatingServersTests.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2023 the original author or authors.
2+
* Copyright 2018-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -405,8 +405,7 @@ public RotatingServerAdvice advice() {
405405
public IntegrationFlow flow() {
406406
return IntegrationFlow.from(Ftp.inboundStreamingAdapter(new FtpRemoteFileTemplate(sf()))
407407
.filter(new FtpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "rotate"))
408-
.remoteDirectory(".")
409-
.maxFetchSize(1),
408+
.remoteDirectory("."),
410409
e -> e.poller(Pollers.fixedDelay(1).advice(advice())))
411410
.channel(MessageChannels.queue("files"))
412411
.get();

0 commit comments

Comments
 (0)