Skip to content

GH-8577: Revise ImapIdleChannelAdapter logic #8588

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 29, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,13 +20,12 @@
import java.time.Instant;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import jakarta.mail.Folder;
import jakarta.mail.Message;
import jakarta.mail.MessagingException;
import org.aopalliance.aop.Advice;

import org.springframework.aop.framework.ProxyFactory;
Expand All @@ -38,6 +37,7 @@
import org.springframework.integration.transaction.IntegrationResourceHolder;
import org.springframework.integration.transaction.IntegrationResourceHolderSynchronization;
import org.springframework.integration.transaction.TransactionSynchronizationFactory;
import org.springframework.messaging.MessagingException;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.TriggerContext;
Expand Down Expand Up @@ -78,12 +78,10 @@ public class ImapIdleChannelAdapter extends MessageProducerSupport implements Be

private boolean shouldReconnectAutomatically = true;

private Executor sendingTaskExecutor = Executors.newFixedThreadPool(1);

private boolean sendingTaskExecutorSet;

private List<Advice> adviceChain;

private Consumer<Object> messageSender;

private long reconnectDelay = DEFAULT_RECONNECT_DELAY; // milliseconds

private volatile ScheduledFuture<?> receivingTask;
Expand All @@ -104,20 +102,19 @@ public void setAdviceChain(List<Advice> adviceChain) {
}

/**
* Specify an {@link Executor} used to send messages received by the
* adapter.
* Specify an {@link Executor} used to send messages received by the adapter.
* @param sendingTaskExecutor the sendingTaskExecutor to set
* @deprecated since 6.1 in favor of async hands-off downstream in the flow,
* e.g. {@link org.springframework.integration.channel.ExecutorChannel}.
*/
@Deprecated(since = "6.1", forRemoval = true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to deprecate in 6.0 and just remove here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd love to, but that is a behavior change and making it deprecated there would just lead to always use a default one.
There is a simple workaround with setting a SyncTaskExecutor instead.
Plus it is not too far from 6.2 this Fall where we indeed will remove it already.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just bite the bullet and make it a breaking change. Deprecating implies that it will still work, but we don't encourage the use.

public void setSendingTaskExecutor(Executor sendingTaskExecutor) {
Assert.notNull(sendingTaskExecutor, "'sendingTaskExecutor' must not be null");
this.sendingTaskExecutor = sendingTaskExecutor;
this.sendingTaskExecutorSet = true;
}

/**
* Specify whether the IDLE task should reconnect automatically after
* catching a {@link jakarta.mail.FolderClosedException} while waiting for messages. The
* default value is <code>true</code>.
* catching a {@link jakarta.mail.MessagingException} while waiting for messages. The
* default value is true.
* @param shouldReconnectAutomatically true to reconnect.
*/
public void setShouldReconnectAutomatically(boolean shouldReconnectAutomatically) {
Expand Down Expand Up @@ -148,6 +145,26 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv
this.applicationEventPublisher = applicationEventPublisher;
}

@Override
@SuppressWarnings("unchecked")
protected void onInit() {
super.onInit();

Consumer<?> messageSenderToUse = new MessageSender();

if (!CollectionUtils.isEmpty(this.adviceChain)) {
ProxyFactory proxyFactory = new ProxyFactory(messageSenderToUse);
this.adviceChain.forEach(proxyFactory::addAdvice);
for (Advice advice : this.adviceChain) {
proxyFactory.addAdvice(advice);
}
messageSenderToUse = (Consumer<?>) proxyFactory.getProxy(this.classLoader);
}

this.messageSender = (Consumer<Object>) messageSenderToUse;
}


/*
* Lifecycle implementation
*/
Expand All @@ -170,69 +187,53 @@ protected void doStop() {
public void destroy() {
super.destroy();
this.mailReceiver.destroy();
// If we're running with the default executor, shut it down.
if (!this.sendingTaskExecutorSet && this.sendingTaskExecutor != null) {
((ExecutorService) this.sendingTaskExecutor).shutdown();
}

private void publishException(Exception ex) {
if (this.applicationEventPublisher != null) {
this.applicationEventPublisher.publishEvent(new ImapIdleExceptionEvent(ex));
}
else {
logger.debug(() -> "No application event publisher for exception: " + ex.getMessage());
}
}

private Runnable createMessageSendingTask(Object mailMessage) {
Runnable sendingTask = prepareSendingTask(mailMessage);
private class MessageSender implements Consumer<Object> {

// wrap in the TX proxy if necessary
if (!CollectionUtils.isEmpty(this.adviceChain)) {
ProxyFactory proxyFactory = new ProxyFactory(sendingTask);
if (!CollectionUtils.isEmpty(this.adviceChain)) {
for (Advice advice : this.adviceChain) {
proxyFactory.addAdvice(advice);
}
}
sendingTask = (Runnable) proxyFactory.getProxy(this.classLoader);
MessageSender() {
}
return sendingTask;
}

private Runnable prepareSendingTask(Object mailMessage) {
return () -> {
@SuppressWarnings("unchecked")
org.springframework.messaging.Message<?> message =
@Override
public void accept(Object mailMessage) {
org.springframework.messaging.Message<?> messageToSend =
mailMessage instanceof Message
? getMessageBuilderFactory().withPayload(mailMessage).build()
: (org.springframework.messaging.Message<Object>) mailMessage;
: (org.springframework.messaging.Message<?>) mailMessage;

if (TransactionSynchronizationManager.isActualTransactionActive()
&& this.transactionSynchronizationFactory != null) {
&& ImapIdleChannelAdapter.this.transactionSynchronizationFactory != null) {

TransactionSynchronization synchronization = this.transactionSynchronizationFactory.create(this);
TransactionSynchronization synchronization =
ImapIdleChannelAdapter.this.transactionSynchronizationFactory.create(this);
if (synchronization != null) {
TransactionSynchronizationManager.registerSynchronization(synchronization);

if (synchronization instanceof IntegrationResourceHolderSynchronization
if (synchronization instanceof IntegrationResourceHolderSynchronization integrationSync
&& !TransactionSynchronizationManager.hasResource(this)) {

TransactionSynchronizationManager.bindResource(this,
((IntegrationResourceHolderSynchronization) synchronization).getResourceHolder());
TransactionSynchronizationManager.bindResource(this, integrationSync.getResourceHolder());
}

Object resourceHolder = TransactionSynchronizationManager.getResource(this);
if (resourceHolder instanceof IntegrationResourceHolder) {
((IntegrationResourceHolder) resourceHolder).setMessage(message);
if (resourceHolder instanceof IntegrationResourceHolder integrationResourceHolder) {
integrationResourceHolder.setMessage(messageToSend);
}
}
}
sendMessage(message);
};
}

private void publishException(Exception ex) {
if (this.applicationEventPublisher != null) {
this.applicationEventPublisher.publishEvent(new ImapIdleExceptionEvent(ex));
sendMessage(messageToSend);
}
else {
logger.debug(() -> "No application event publisher for exception: " + ex.getMessage());
}
}

}

private class ReceivingTask implements Runnable {

Expand All @@ -246,10 +247,23 @@ public void run() {
ImapIdleChannelAdapter.this.idleTask.run();
logger.debug("Task completed successfully. Re-scheduling it again right away.");
}
catch (Exception ex) { //run again after a delay
logger.warn(ex, () -> "Failed to execute IDLE task. Will attempt to resubmit in "
+ ImapIdleChannelAdapter.this.reconnectDelay + " milliseconds.");
ImapIdleChannelAdapter.this.receivingTaskTrigger.delayNextExecution();
catch (Exception ex) {
if (ImapIdleChannelAdapter.this.shouldReconnectAutomatically
&& ex.getCause() instanceof jakarta.mail.MessagingException messagingException) {

//run again after a delay
logger.info(messagingException,
() -> "Failed to execute IDLE task. Will attempt to resubmit in "
+ ImapIdleChannelAdapter.this.reconnectDelay + " milliseconds.");
ImapIdleChannelAdapter.this.receivingTaskTrigger.delayNextExecution();
}
else {
logger.warn(ex,
"Failed to execute IDLE task. " +
"Won't resubmit since not a 'shouldReconnectAutomatically'" +
"or not a 'jakarta.mail.MessagingException'");
ImapIdleChannelAdapter.this.receivingTaskTrigger.stop();
}
publishException(ex);
}
}
Expand All @@ -274,21 +288,19 @@ public void run() {
Object[] mailMessages = ImapIdleChannelAdapter.this.mailReceiver.receive();
logger.debug(() -> "received " + mailMessages.length + " mail messages");
for (Object mailMessage : mailMessages) {
Runnable messageSendingTask = createMessageSendingTask(mailMessage);
if (isRunning()) {
ImapIdleChannelAdapter.this.sendingTaskExecutor.execute(messageSendingTask);
ImapIdleChannelAdapter.this.messageSender.accept(mailMessage);
}
}
}
}
catch (MessagingException ex) {
catch (jakarta.mail.MessagingException ex) {
logger.warn(ex, "error occurred in idle task");
if (ImapIdleChannelAdapter.this.shouldReconnectAutomatically) {
throw new IllegalStateException("Failure in 'idle' task. Will resubmit.", ex);
}
else {
throw new org.springframework.messaging.MessagingException(
"Failure in 'idle' task. Will NOT resubmit.", ex);
throw new MessagingException("Failure in 'idle' task. Will NOT resubmit.", ex);
}
}
}
Expand All @@ -298,16 +310,20 @@ public void run() {

private class ExceptionAwarePeriodicTrigger implements Trigger {

private volatile boolean delayNextExecution;
private final AtomicBoolean delayNextExecution = new AtomicBoolean();

private final AtomicBoolean stop = new AtomicBoolean();


ExceptionAwarePeriodicTrigger() {
}

@Override
public Instant nextExecution(TriggerContext triggerContext) {
if (this.delayNextExecution) {
this.delayNextExecution = false;
if (this.stop.getAndSet(false)) {
return null;
}
if (this.delayNextExecution.getAndSet(false)) {
return Instant.now().plusMillis(ImapIdleChannelAdapter.this.reconnectDelay);
}
else {
Expand All @@ -316,7 +332,11 @@ public Instant nextExecution(TriggerContext triggerContext) {
}

void delayNextExecution() {
this.delayNextExecution = true;
this.delayNextExecution.set(true);
}

void stop() {
this.stop.set(true);
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -54,8 +54,6 @@ protected AbstractBeanDefinition doParse(Element element, ParserContext parserCo
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, txElement,
"synchronization-factory", "transactionSynchronizationFactory");
}
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element, "task-executor",
"sendingTaskExecutor");
AbstractBeanDefinition beanDefinition = builder.getBeanDefinition();
IntegrationNamespaceUtils.configureAndSetAdviceChainIfPresent(null,
DomUtils.getChildElementByTagName(element, "transactional"), beanDefinition, parserContext);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2022 the original author or authors.
* Copyright 2014-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -352,10 +352,11 @@ public ImapIdleChannelAdapterSpec transactional() {
* Specify a task executor to be used to send messages to the downstream flow.
* @param sendingTaskExecutor the sendingTaskExecutor.
* @return the spec.
* @see ImapIdleChannelAdapter#setSendingTaskExecutor(Executor)
* @deprecated since 6.1 in favor of async hands-off downstream in the flow,
* e.g. {@link org.springframework.integration.channel.ExecutorChannel}.
*/
@Deprecated(since = "6.1", forRemoval = true)
public ImapIdleChannelAdapterSpec sendingTaskExecutor(Executor sendingTaskExecutor) {
this.target.setSendingTaskExecutor(sendingTaskExecutor);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,11 @@
<xsd:attribute name="task-executor" type="xsd:string">
<xsd:annotation>
<xsd:documentation><![CDATA[
Reference to a bean that implements
[DEPRECATED] Reference to a bean that implements
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment re: deprecation; I don't see how we can deprecate it, but just ignore it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do ignore it from the parser.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw that; but it makes no sense to me; if the parser ignores it, we should remove it altogether (and record it as a breaking change in the migration guide).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Let me issue an new PR against 6.0.x to just deprecate this executor.
Then I'll fix this as a removal.
And yes: in the end a dedicated migration guide note.

org.springframework.core.task.TaskExecutor which is used
to send Messages received by this adapter.
If not provided, the adapter uses a single-threaded executor.
Deprecated since 6.1 in favor of async hand-off downstream in the flow.
]]></xsd:documentation>
<xsd:appinfo>
<tool:annotation kind="ref">
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -209,6 +209,7 @@ public void testIdleWithServerGuts(ImapMailReceiver receiver, boolean mapped, bo
adapter.setOutputChannel(channel);
adapter.setTaskScheduler(taskScheduler);
adapter.setReconnectDelay(1);
adapter.afterPropertiesSet();
adapter.start();
MimeMessage message =
GreenMailUtil.createTextEmail("Foo <foo@bar>", "Bar <bar@baz>", "Test Email", "foo\r\n",
Expand Down Expand Up @@ -698,6 +699,7 @@ public void testInitialIdleDelayWhenRecentIsSupported() throws Exception {
QueueChannel channel = new QueueChannel();
adapter.setOutputChannel(channel);
adapter.setReconnectDelay(1);
adapter.afterPropertiesSet();

ImapMailReceiver receiver = new ImapMailReceiver("imap:foo");
receiver.setCancelIdleInterval(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@
store-uri="imap:foo"
channel="channel"
auto-startup="false"
should-delete-messages="true"
task-executor="executor">
should-delete-messages="true">
<mail:transactional synchronization-factory="syncFactory" />
</mail:imap-idle-channel-adapter>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -194,7 +194,6 @@ public void transactionalAdapter() {
assertThat(receiverAccessor.getPropertyValue("shouldDeleteMessages")).isEqualTo(Boolean.TRUE);
assertThat(receiverAccessor.getPropertyValue("shouldMarkMessagesAsRead")).isEqualTo(Boolean.TRUE);
assertThat(adapterAccessor.getPropertyValue("errorChannel")).isNull();
assertThat(adapterAccessor.getPropertyValue("sendingTaskExecutor")).isEqualTo(context.getBean("executor"));
assertThat(adapterAccessor.getPropertyValue("adviceChain")).isNotNull();
}

Expand Down
Loading