Skip to content

GH-8577: Revise ImapIdleChannelAdapter logic #8588

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,14 +19,12 @@
import java.io.Serial;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import jakarta.mail.Folder;
import jakarta.mail.Message;
import jakarta.mail.MessagingException;
import org.aopalliance.aop.Advice;

import org.springframework.aop.framework.ProxyFactory;
Expand All @@ -38,6 +36,7 @@
import org.springframework.integration.transaction.IntegrationResourceHolder;
import org.springframework.integration.transaction.IntegrationResourceHolderSynchronization;
import org.springframework.integration.transaction.TransactionSynchronizationFactory;
import org.springframework.messaging.MessagingException;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.TriggerContext;
Expand Down Expand Up @@ -78,12 +77,10 @@ public class ImapIdleChannelAdapter extends MessageProducerSupport implements Be

private boolean shouldReconnectAutomatically = true;

private Executor sendingTaskExecutor = Executors.newFixedThreadPool(1);

private boolean sendingTaskExecutorSet;

private List<Advice> adviceChain;

private Consumer<Object> messageSender;

private long reconnectDelay = DEFAULT_RECONNECT_DELAY; // milliseconds

private volatile ScheduledFuture<?> receivingTask;
Expand All @@ -103,21 +100,10 @@ public void setAdviceChain(List<Advice> adviceChain) {
this.adviceChain = adviceChain;
}

/**
* Specify an {@link Executor} used to send messages received by the
* adapter.
* @param sendingTaskExecutor the sendingTaskExecutor to set
*/
public void setSendingTaskExecutor(Executor sendingTaskExecutor) {
Assert.notNull(sendingTaskExecutor, "'sendingTaskExecutor' must not be null");
this.sendingTaskExecutor = sendingTaskExecutor;
this.sendingTaskExecutorSet = true;
}

/**
* Specify whether the IDLE task should reconnect automatically after
* catching a {@link jakarta.mail.FolderClosedException} while waiting for messages. The
* default value is <code>true</code>.
* catching a {@link jakarta.mail.MessagingException} while waiting for messages. The
* default value is true.
* @param shouldReconnectAutomatically true to reconnect.
*/
public void setShouldReconnectAutomatically(boolean shouldReconnectAutomatically) {
Expand Down Expand Up @@ -148,6 +134,26 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv
this.applicationEventPublisher = applicationEventPublisher;
}

@Override
@SuppressWarnings("unchecked")
protected void onInit() {
super.onInit();

Consumer<?> messageSenderToUse = new MessageSender();

if (!CollectionUtils.isEmpty(this.adviceChain)) {
ProxyFactory proxyFactory = new ProxyFactory(messageSenderToUse);
this.adviceChain.forEach(proxyFactory::addAdvice);
for (Advice advice : this.adviceChain) {
proxyFactory.addAdvice(advice);
}
messageSenderToUse = (Consumer<?>) proxyFactory.getProxy(this.classLoader);
}

this.messageSender = (Consumer<Object>) messageSenderToUse;
}


/*
* Lifecycle implementation
*/
Expand All @@ -162,77 +168,64 @@ protected void doStart() {
@Override
// guarded by super#lifecycleLock
protected void doStop() {
this.receivingTask.cancel(true);
if (this.receivingTask != null) {
this.receivingTask.cancel(true);
this.receivingTask = null;
}
this.mailReceiver.cancelPing();
}

@Override
public void destroy() {
super.destroy();
this.mailReceiver.destroy();
// If we're running with the default executor, shut it down.
if (!this.sendingTaskExecutorSet && this.sendingTaskExecutor != null) {
((ExecutorService) this.sendingTaskExecutor).shutdown();
}

private void publishException(Exception ex) {
if (this.applicationEventPublisher != null) {
this.applicationEventPublisher.publishEvent(new ImapIdleExceptionEvent(ex));
}
else {
logger.debug(() -> "No application event publisher for exception: " + ex.getMessage());
}
}

private Runnable createMessageSendingTask(Object mailMessage) {
Runnable sendingTask = prepareSendingTask(mailMessage);
private class MessageSender implements Consumer<Object> {

// wrap in the TX proxy if necessary
if (!CollectionUtils.isEmpty(this.adviceChain)) {
ProxyFactory proxyFactory = new ProxyFactory(sendingTask);
if (!CollectionUtils.isEmpty(this.adviceChain)) {
for (Advice advice : this.adviceChain) {
proxyFactory.addAdvice(advice);
}
}
sendingTask = (Runnable) proxyFactory.getProxy(this.classLoader);
MessageSender() {
}
return sendingTask;
}

private Runnable prepareSendingTask(Object mailMessage) {
return () -> {
@SuppressWarnings("unchecked")
org.springframework.messaging.Message<?> message =
@Override
public void accept(Object mailMessage) {
org.springframework.messaging.Message<?> messageToSend =
mailMessage instanceof Message
? getMessageBuilderFactory().withPayload(mailMessage).build()
: (org.springframework.messaging.Message<Object>) mailMessage;
: (org.springframework.messaging.Message<?>) mailMessage;

if (TransactionSynchronizationManager.isActualTransactionActive()
&& this.transactionSynchronizationFactory != null) {
&& ImapIdleChannelAdapter.this.transactionSynchronizationFactory != null) {

TransactionSynchronization synchronization = this.transactionSynchronizationFactory.create(this);
TransactionSynchronization synchronization =
ImapIdleChannelAdapter.this.transactionSynchronizationFactory.create(this);
if (synchronization != null) {
TransactionSynchronizationManager.registerSynchronization(synchronization);

if (synchronization instanceof IntegrationResourceHolderSynchronization
if (synchronization instanceof IntegrationResourceHolderSynchronization integrationSync
&& !TransactionSynchronizationManager.hasResource(this)) {

TransactionSynchronizationManager.bindResource(this,
((IntegrationResourceHolderSynchronization) synchronization).getResourceHolder());
TransactionSynchronizationManager.bindResource(this, integrationSync.getResourceHolder());
}

Object resourceHolder = TransactionSynchronizationManager.getResource(this);
if (resourceHolder instanceof IntegrationResourceHolder) {
((IntegrationResourceHolder) resourceHolder).setMessage(message);
if (resourceHolder instanceof IntegrationResourceHolder integrationResourceHolder) {
integrationResourceHolder.setMessage(messageToSend);
}
}
}
sendMessage(message);
};
}

private void publishException(Exception ex) {
if (this.applicationEventPublisher != null) {
this.applicationEventPublisher.publishEvent(new ImapIdleExceptionEvent(ex));
sendMessage(messageToSend);
}
else {
logger.debug(() -> "No application event publisher for exception: " + ex.getMessage());
}
}

}

private class ReceivingTask implements Runnable {

Expand All @@ -246,10 +239,23 @@ public void run() {
ImapIdleChannelAdapter.this.idleTask.run();
logger.debug("Task completed successfully. Re-scheduling it again right away.");
}
catch (Exception ex) { //run again after a delay
logger.warn(ex, () -> "Failed to execute IDLE task. Will attempt to resubmit in "
+ ImapIdleChannelAdapter.this.reconnectDelay + " milliseconds.");
ImapIdleChannelAdapter.this.receivingTaskTrigger.delayNextExecution();
catch (Exception ex) {
if (ImapIdleChannelAdapter.this.shouldReconnectAutomatically
&& ex.getCause() instanceof jakarta.mail.MessagingException messagingException) {

//run again after a delay
logger.info(messagingException,
() -> "Failed to execute IDLE task. Will attempt to resubmit in "
+ ImapIdleChannelAdapter.this.reconnectDelay + " milliseconds.");
ImapIdleChannelAdapter.this.receivingTaskTrigger.delayNextExecution();
}
else {
logger.warn(ex,
"Failed to execute IDLE task. " +
"Won't resubmit since not a 'shouldReconnectAutomatically'" +
"or not a 'jakarta.mail.MessagingException'");
ImapIdleChannelAdapter.this.receivingTaskTrigger.stop();
}
publishException(ex);
}
}
Expand All @@ -274,21 +280,19 @@ public void run() {
Object[] mailMessages = ImapIdleChannelAdapter.this.mailReceiver.receive();
logger.debug(() -> "received " + mailMessages.length + " mail messages");
for (Object mailMessage : mailMessages) {
Runnable messageSendingTask = createMessageSendingTask(mailMessage);
if (isRunning()) {
ImapIdleChannelAdapter.this.sendingTaskExecutor.execute(messageSendingTask);
ImapIdleChannelAdapter.this.messageSender.accept(mailMessage);
}
}
}
}
catch (MessagingException ex) {
catch (jakarta.mail.MessagingException ex) {
logger.warn(ex, "error occurred in idle task");
if (ImapIdleChannelAdapter.this.shouldReconnectAutomatically) {
throw new IllegalStateException("Failure in 'idle' task. Will resubmit.", ex);
}
else {
throw new org.springframework.messaging.MessagingException(
"Failure in 'idle' task. Will NOT resubmit.", ex);
throw new MessagingException("Failure in 'idle' task. Will NOT resubmit.", ex);
}
}
}
Expand All @@ -298,16 +302,20 @@ public void run() {

private class ExceptionAwarePeriodicTrigger implements Trigger {

private volatile boolean delayNextExecution;
private final AtomicBoolean delayNextExecution = new AtomicBoolean();

private final AtomicBoolean stop = new AtomicBoolean();


ExceptionAwarePeriodicTrigger() {
}

@Override
public Instant nextExecution(TriggerContext triggerContext) {
if (this.delayNextExecution) {
this.delayNextExecution = false;
if (this.stop.getAndSet(false)) {
return null;
}
if (this.delayNextExecution.getAndSet(false)) {
return Instant.now().plusMillis(ImapIdleChannelAdapter.this.reconnectDelay);
}
else {
Expand All @@ -316,7 +324,11 @@ public Instant nextExecution(TriggerContext triggerContext) {
}

void delayNextExecution() {
this.delayNextExecution = true;
this.delayNextExecution.set(true);
}

void stop() {
this.stop.set(true);
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -54,8 +54,6 @@ protected AbstractBeanDefinition doParse(Element element, ParserContext parserCo
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, txElement,
"synchronization-factory", "transactionSynchronizationFactory");
}
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "task-executor",
"sendingTaskExecutor");
AbstractBeanDefinition beanDefinition = builder.getBeanDefinition();
IntegrationNamespaceUtils.configureAndSetAdviceChainIfPresent(null,
DomUtils.getChildElementByTagName(element, "transactional"), beanDefinition, parserContext);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2022 the original author or authors.
* Copyright 2014-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,7 +22,6 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;

Expand Down Expand Up @@ -348,17 +347,6 @@ public ImapIdleChannelAdapterSpec transactional() {
return transactional(transactionInterceptor);
}

/**
* Specify a task executor to be used to send messages to the downstream flow.
* @param sendingTaskExecutor the sendingTaskExecutor.
* @return the spec.
* @see ImapIdleChannelAdapter#setSendingTaskExecutor(Executor)
*/
public ImapIdleChannelAdapterSpec sendingTaskExecutor(Executor sendingTaskExecutor) {
this.target.setSendingTaskExecutor(sendingTaskExecutor);
return this;
}

/**
* @param shouldReconnectAutomatically the shouldReconnectAutomatically.
* @return the spec.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,21 +164,6 @@
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="task-executor" type="xsd:string">
<xsd:annotation>
<xsd:documentation><![CDATA[
Reference to a bean that implements
org.springframework.core.task.TaskExecutor which is used
to send Messages received by this adapter.
If not provided, the adapter uses a single-threaded executor.
]]></xsd:documentation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type type="org.springframework.core.task.TaskExecutor"/>
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="cancel-idle-interval" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
Expand All @@ -197,7 +182,7 @@
<xsd:attribute name="store-uri" type="xsd:string">
<xsd:annotation>
<xsd:documentation><![CDATA[
The URI for the Mail Store. Typically of the form: [pop3|imap]://user:password@host:port/INBOX
The URI for the Mail Store. Typically, of the form: [pop3|imap]://user:password@host:port/INBOX
If this is not provided, then the store will be retrieved via the no-arg Session.getStore()
instead of the Session.getStore(url) method.
]]></xsd:documentation>
Expand All @@ -206,7 +191,8 @@
<xsd:attribute name="mail-filter-expression" type="xsd:string">
<xsd:annotation>
<xsd:documentation><![CDATA[
Allows you to provide a SpEL expression which defines a fine grained filtering criteria for the mail messages to be processed by this adapter.
Allows you to provide a SpEL expression which defines a fine-grained
filtering criteria for the mail messages to be processed by this adapter.
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
Expand Down
Loading