Skip to content

Commit efb1972

Browse files
committed
spring-projectsGH-8577: Revise ImapIdleChannelAdapter logic
Fixes spring-projects#8577 When we process mail messages in async manner, it is possible that we end up in a race condition situation where the next idle cycle closes the folder. It is possible to reopen the folder, but feels better to block the current idle cycle until we are done with the message and therefore keep folder opened. * Deprecate `ImapIdleChannelAdapter.sendingTaskExecutor` in favor of an `ExecutorChannel` as an output for this channel adapter or similar async hand-off downstream. * Make use of `shouldReconnectAutomatically` as it is advertised for this channel adapter * Optimize the proxy creation for message sending task
1 parent 775e6fd commit efb1972

File tree

9 files changed

+114
-84
lines changed

9 files changed

+114
-84
lines changed

spring-integration-mail/src/main/java/org/springframework/integration/mail/ImapIdleChannelAdapter.java

Lines changed: 87 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,13 +20,12 @@
2020
import java.time.Instant;
2121
import java.util.List;
2222
import java.util.concurrent.Executor;
23-
import java.util.concurrent.ExecutorService;
24-
import java.util.concurrent.Executors;
2523
import java.util.concurrent.ScheduledFuture;
24+
import java.util.concurrent.atomic.AtomicBoolean;
25+
import java.util.function.Consumer;
2626

2727
import jakarta.mail.Folder;
2828
import jakarta.mail.Message;
29-
import jakarta.mail.MessagingException;
3029
import org.aopalliance.aop.Advice;
3130

3231
import org.springframework.aop.framework.ProxyFactory;
@@ -38,6 +37,7 @@
3837
import org.springframework.integration.transaction.IntegrationResourceHolder;
3938
import org.springframework.integration.transaction.IntegrationResourceHolderSynchronization;
4039
import org.springframework.integration.transaction.TransactionSynchronizationFactory;
40+
import org.springframework.messaging.MessagingException;
4141
import org.springframework.scheduling.TaskScheduler;
4242
import org.springframework.scheduling.Trigger;
4343
import org.springframework.scheduling.TriggerContext;
@@ -78,12 +78,10 @@ public class ImapIdleChannelAdapter extends MessageProducerSupport implements Be
7878

7979
private boolean shouldReconnectAutomatically = true;
8080

81-
private Executor sendingTaskExecutor = Executors.newFixedThreadPool(1);
82-
83-
private boolean sendingTaskExecutorSet;
84-
8581
private List<Advice> adviceChain;
8682

83+
private Consumer<Object> messageSender;
84+
8785
private long reconnectDelay = DEFAULT_RECONNECT_DELAY; // milliseconds
8886

8987
private volatile ScheduledFuture<?> receivingTask;
@@ -104,20 +102,19 @@ public void setAdviceChain(List<Advice> adviceChain) {
104102
}
105103

106104
/**
107-
* Specify an {@link Executor} used to send messages received by the
108-
* adapter.
105+
* Specify an {@link Executor} used to send messages received by the adapter.
109106
* @param sendingTaskExecutor the sendingTaskExecutor to set
107+
* @deprecated since 6.1 in favor of async hands-off downstream in the flow,
108+
* e.g. {@link org.springframework.integration.channel.ExecutorChannel}.
110109
*/
110+
@Deprecated(since = "6.1", forRemoval = true)
111111
public void setSendingTaskExecutor(Executor sendingTaskExecutor) {
112-
Assert.notNull(sendingTaskExecutor, "'sendingTaskExecutor' must not be null");
113-
this.sendingTaskExecutor = sendingTaskExecutor;
114-
this.sendingTaskExecutorSet = true;
115112
}
116113

117114
/**
118115
* Specify whether the IDLE task should reconnect automatically after
119-
* catching a {@link jakarta.mail.FolderClosedException} while waiting for messages. The
120-
* default value is <code>true</code>.
116+
* catching a {@link jakarta.mail.MessagingException} while waiting for messages. The
117+
* default value is true.
121118
* @param shouldReconnectAutomatically true to reconnect.
122119
*/
123120
public void setShouldReconnectAutomatically(boolean shouldReconnectAutomatically) {
@@ -148,6 +145,26 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv
148145
this.applicationEventPublisher = applicationEventPublisher;
149146
}
150147

148+
@Override
149+
@SuppressWarnings("unchecked")
150+
protected void onInit() {
151+
super.onInit();
152+
153+
Consumer<?> messageSenderToUse = new MessageSender();
154+
155+
if (!CollectionUtils.isEmpty(this.adviceChain)) {
156+
ProxyFactory proxyFactory = new ProxyFactory(messageSenderToUse);
157+
this.adviceChain.forEach(proxyFactory::addAdvice);
158+
for (Advice advice : this.adviceChain) {
159+
proxyFactory.addAdvice(advice);
160+
}
161+
messageSenderToUse = (Consumer<?>) proxyFactory.getProxy(this.classLoader);
162+
}
163+
164+
this.messageSender = (Consumer<Object>) messageSenderToUse;
165+
}
166+
167+
151168
/*
152169
* Lifecycle implementation
153170
*/
@@ -170,69 +187,53 @@ protected void doStop() {
170187
public void destroy() {
171188
super.destroy();
172189
this.mailReceiver.destroy();
173-
// If we're running with the default executor, shut it down.
174-
if (!this.sendingTaskExecutorSet && this.sendingTaskExecutor != null) {
175-
((ExecutorService) this.sendingTaskExecutor).shutdown();
190+
}
191+
192+
private void publishException(Exception ex) {
193+
if (this.applicationEventPublisher != null) {
194+
this.applicationEventPublisher.publishEvent(new ImapIdleExceptionEvent(ex));
195+
}
196+
else {
197+
logger.debug(() -> "No application event publisher for exception: " + ex.getMessage());
176198
}
177199
}
178200

179-
private Runnable createMessageSendingTask(Object mailMessage) {
180-
Runnable sendingTask = prepareSendingTask(mailMessage);
201+
private class MessageSender implements Consumer<Object> {
181202

182-
// wrap in the TX proxy if necessary
183-
if (!CollectionUtils.isEmpty(this.adviceChain)) {
184-
ProxyFactory proxyFactory = new ProxyFactory(sendingTask);
185-
if (!CollectionUtils.isEmpty(this.adviceChain)) {
186-
for (Advice advice : this.adviceChain) {
187-
proxyFactory.addAdvice(advice);
188-
}
189-
}
190-
sendingTask = (Runnable) proxyFactory.getProxy(this.classLoader);
203+
MessageSender() {
191204
}
192-
return sendingTask;
193-
}
194205

195-
private Runnable prepareSendingTask(Object mailMessage) {
196-
return () -> {
197-
@SuppressWarnings("unchecked")
198-
org.springframework.messaging.Message<?> message =
206+
@Override
207+
public void accept(Object mailMessage) {
208+
org.springframework.messaging.Message<?> messageToSend =
199209
mailMessage instanceof Message
200210
? getMessageBuilderFactory().withPayload(mailMessage).build()
201-
: (org.springframework.messaging.Message<Object>) mailMessage;
211+
: (org.springframework.messaging.Message<?>) mailMessage;
202212

203213
if (TransactionSynchronizationManager.isActualTransactionActive()
204-
&& this.transactionSynchronizationFactory != null) {
214+
&& ImapIdleChannelAdapter.this.transactionSynchronizationFactory != null) {
205215

206-
TransactionSynchronization synchronization = this.transactionSynchronizationFactory.create(this);
216+
TransactionSynchronization synchronization =
217+
ImapIdleChannelAdapter.this.transactionSynchronizationFactory.create(this);
207218
if (synchronization != null) {
208219
TransactionSynchronizationManager.registerSynchronization(synchronization);
209220

210-
if (synchronization instanceof IntegrationResourceHolderSynchronization
221+
if (synchronization instanceof IntegrationResourceHolderSynchronization integrationSync
211222
&& !TransactionSynchronizationManager.hasResource(this)) {
212223

213-
TransactionSynchronizationManager.bindResource(this,
214-
((IntegrationResourceHolderSynchronization) synchronization).getResourceHolder());
224+
TransactionSynchronizationManager.bindResource(this, integrationSync.getResourceHolder());
215225
}
216226

217227
Object resourceHolder = TransactionSynchronizationManager.getResource(this);
218-
if (resourceHolder instanceof IntegrationResourceHolder) {
219-
((IntegrationResourceHolder) resourceHolder).setMessage(message);
228+
if (resourceHolder instanceof IntegrationResourceHolder integrationResourceHolder) {
229+
integrationResourceHolder.setMessage(messageToSend);
220230
}
221231
}
222232
}
223-
sendMessage(message);
224-
};
225-
}
226-
227-
private void publishException(Exception ex) {
228-
if (this.applicationEventPublisher != null) {
229-
this.applicationEventPublisher.publishEvent(new ImapIdleExceptionEvent(ex));
233+
sendMessage(messageToSend);
230234
}
231-
else {
232-
logger.debug(() -> "No application event publisher for exception: " + ex.getMessage());
233-
}
234-
}
235235

236+
}
236237

237238
private class ReceivingTask implements Runnable {
238239

@@ -246,10 +247,23 @@ public void run() {
246247
ImapIdleChannelAdapter.this.idleTask.run();
247248
logger.debug("Task completed successfully. Re-scheduling it again right away.");
248249
}
249-
catch (Exception ex) { //run again after a delay
250-
logger.warn(ex, () -> "Failed to execute IDLE task. Will attempt to resubmit in "
251-
+ ImapIdleChannelAdapter.this.reconnectDelay + " milliseconds.");
252-
ImapIdleChannelAdapter.this.receivingTaskTrigger.delayNextExecution();
250+
catch (Exception ex) {
251+
if (ImapIdleChannelAdapter.this.shouldReconnectAutomatically
252+
&& ex.getCause() instanceof jakarta.mail.MessagingException messagingException) {
253+
254+
//run again after a delay
255+
logger.info(messagingException,
256+
() -> "Failed to execute IDLE task. Will attempt to resubmit in "
257+
+ ImapIdleChannelAdapter.this.reconnectDelay + " milliseconds.");
258+
ImapIdleChannelAdapter.this.receivingTaskTrigger.delayNextExecution();
259+
}
260+
else {
261+
logger.warn(ex,
262+
"Failed to execute IDLE task. " +
263+
"Won't resubmit since not a 'shouldReconnectAutomatically'" +
264+
"or not a 'jakarta.mail.MessagingException'");
265+
ImapIdleChannelAdapter.this.receivingTaskTrigger.stop();
266+
}
253267
publishException(ex);
254268
}
255269
}
@@ -274,21 +288,19 @@ public void run() {
274288
Object[] mailMessages = ImapIdleChannelAdapter.this.mailReceiver.receive();
275289
logger.debug(() -> "received " + mailMessages.length + " mail messages");
276290
for (Object mailMessage : mailMessages) {
277-
Runnable messageSendingTask = createMessageSendingTask(mailMessage);
278291
if (isRunning()) {
279-
ImapIdleChannelAdapter.this.sendingTaskExecutor.execute(messageSendingTask);
292+
ImapIdleChannelAdapter.this.messageSender.accept(mailMessage);
280293
}
281294
}
282295
}
283296
}
284-
catch (MessagingException ex) {
297+
catch (jakarta.mail.MessagingException ex) {
285298
logger.warn(ex, "error occurred in idle task");
286299
if (ImapIdleChannelAdapter.this.shouldReconnectAutomatically) {
287300
throw new IllegalStateException("Failure in 'idle' task. Will resubmit.", ex);
288301
}
289302
else {
290-
throw new org.springframework.messaging.MessagingException(
291-
"Failure in 'idle' task. Will NOT resubmit.", ex);
303+
throw new MessagingException("Failure in 'idle' task. Will NOT resubmit.", ex);
292304
}
293305
}
294306
}
@@ -298,16 +310,20 @@ public void run() {
298310

299311
private class ExceptionAwarePeriodicTrigger implements Trigger {
300312

301-
private volatile boolean delayNextExecution;
313+
private final AtomicBoolean delayNextExecution = new AtomicBoolean();
314+
315+
private final AtomicBoolean stop = new AtomicBoolean();
302316

303317

304318
ExceptionAwarePeriodicTrigger() {
305319
}
306320

307321
@Override
308322
public Instant nextExecution(TriggerContext triggerContext) {
309-
if (this.delayNextExecution) {
310-
this.delayNextExecution = false;
323+
if (this.stop.getAndSet(false)) {
324+
return null;
325+
}
326+
if (this.delayNextExecution.getAndSet(false)) {
311327
return Instant.now().plusMillis(ImapIdleChannelAdapter.this.reconnectDelay);
312328
}
313329
else {
@@ -316,7 +332,11 @@ public Instant nextExecution(TriggerContext triggerContext) {
316332
}
317333

318334
void delayNextExecution() {
319-
this.delayNextExecution = true;
335+
this.delayNextExecution.set(true);
336+
}
337+
338+
void stop() {
339+
this.stop.set(true);
320340
}
321341

322342
}

spring-integration-mail/src/main/java/org/springframework/integration/mail/config/ImapIdleChannelAdapterParser.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -54,8 +54,6 @@ protected AbstractBeanDefinition doParse(Element element, ParserContext parserCo
5454
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, txElement,
5555
"synchronization-factory", "transactionSynchronizationFactory");
5656
}
57-
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "task-executor",
58-
"sendingTaskExecutor");
5957
AbstractBeanDefinition beanDefinition = builder.getBeanDefinition();
6058
IntegrationNamespaceUtils.configureAndSetAdviceChainIfPresent(null,
6159
DomUtils.getChildElementByTagName(element, "transactional"), beanDefinition, parserContext);

spring-integration-mail/src/main/java/org/springframework/integration/mail/dsl/ImapIdleChannelAdapterSpec.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2022 the original author or authors.
2+
* Copyright 2014-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -352,10 +352,11 @@ public ImapIdleChannelAdapterSpec transactional() {
352352
* Specify a task executor to be used to send messages to the downstream flow.
353353
* @param sendingTaskExecutor the sendingTaskExecutor.
354354
* @return the spec.
355-
* @see ImapIdleChannelAdapter#setSendingTaskExecutor(Executor)
355+
* @deprecated since 6.1 in favor of async hands-off downstream in the flow,
356+
* e.g. {@link org.springframework.integration.channel.ExecutorChannel}.
356357
*/
358+
@Deprecated(since = "6.1", forRemoval = true)
357359
public ImapIdleChannelAdapterSpec sendingTaskExecutor(Executor sendingTaskExecutor) {
358-
this.target.setSendingTaskExecutor(sendingTaskExecutor);
359360
return this;
360361
}
361362

spring-integration-mail/src/main/resources/org/springframework/integration/mail/config/spring-integration-mail.xsd

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,10 +167,11 @@
167167
<xsd:attribute name="task-executor" type="xsd:string">
168168
<xsd:annotation>
169169
<xsd:documentation><![CDATA[
170-
Reference to a bean that implements
170+
[DEPRECATED] Reference to a bean that implements
171171
org.springframework.core.task.TaskExecutor which is used
172172
to send Messages received by this adapter.
173173
If not provided, the adapter uses a single-threaded executor.
174+
Deprecated since 6.1 in favor of async hand-off downstream in the flow.
174175
]]></xsd:documentation>
175176
<xsd:appinfo>
176177
<tool:annotation kind="ref">

spring-integration-mail/src/test/java/org/springframework/integration/mail/ImapMailReceiverTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -209,6 +209,7 @@ public void testIdleWithServerGuts(ImapMailReceiver receiver, boolean mapped, bo
209209
adapter.setOutputChannel(channel);
210210
adapter.setTaskScheduler(taskScheduler);
211211
adapter.setReconnectDelay(1);
212+
adapter.afterPropertiesSet();
212213
adapter.start();
213214
MimeMessage message =
214215
GreenMailUtil.createTextEmail("Foo <foo@bar>", "Bar <bar@baz>", "Test Email", "foo\r\n",
@@ -698,6 +699,7 @@ public void testInitialIdleDelayWhenRecentIsSupported() throws Exception {
698699
QueueChannel channel = new QueueChannel();
699700
adapter.setOutputChannel(channel);
700701
adapter.setReconnectDelay(1);
702+
adapter.afterPropertiesSet();
701703

702704
ImapMailReceiver receiver = new ImapMailReceiver("imap:foo");
703705
receiver.setCancelIdleInterval(1);

spring-integration-mail/src/test/java/org/springframework/integration/mail/config/ImapIdleChannelAdapterParserTests-context.xml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,7 @@
7878
store-uri="imap:foo"
7979
channel="channel"
8080
auto-startup="false"
81-
should-delete-messages="true"
82-
task-executor="executor">
81+
should-delete-messages="true">
8382
<mail:transactional synchronization-factory="syncFactory" />
8483
</mail:imap-idle-channel-adapter>
8584

spring-integration-mail/src/test/java/org/springframework/integration/mail/config/ImapIdleChannelAdapterParserTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -194,7 +194,6 @@ public void transactionalAdapter() {
194194
assertThat(receiverAccessor.getPropertyValue("shouldDeleteMessages")).isEqualTo(Boolean.TRUE);
195195
assertThat(receiverAccessor.getPropertyValue("shouldMarkMessagesAsRead")).isEqualTo(Boolean.TRUE);
196196
assertThat(adapterAccessor.getPropertyValue("errorChannel")).isNull();
197-
assertThat(adapterAccessor.getPropertyValue("sendingTaskExecutor")).isEqualTo(context.getBean("executor"));
198197
assertThat(adapterAccessor.getPropertyValue("adviceChain")).isNotNull();
199198
}
200199

0 commit comments

Comments
 (0)