Skip to content

Commit b869666

Browse files
artembilangaryrussell
authored andcommitted
INT-4261: Do not Register Resource to TX Twice
JIRA: https://jira.spring.io/browse/INT-4261 When we have some `Advice` withing TX Advice which may perform `doPoll()` several times, we unconditionally call `transactionSynchronizationFactory.create(resource)`. With the out-of-the-box implementations `DefaultTransactionSynchronizationFactory` and `PassThroughTransactionSynchronizationFactory` we preform `TransactionSynchronizationManager.bindResource()`. If resource is already there, an `IllegalStateException` is thrown * Check that resource isn't bound already to the TX and don't create a new `TransactionalResourceSynchronization` - just return `null` * Check in the target users for the `null` before registering synchronization Move resource registration to TX outside of out-of-the-box factories * Fix condition in the `AbstractPollingEndpoint` for the resource * Increase responsiveness of TX test to decreasing `fixed-delay` and using `receive-timeout="-1"`
1 parent 1282cb5 commit b869666

File tree

8 files changed

+80
-49
lines changed

8 files changed

+80
-49
lines changed

spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -309,19 +309,33 @@ protected String getResourceKey() {
309309
private IntegrationResourceHolder bindResourceHolderIfNecessary(String key, Object resource) {
310310
if (this.transactionSynchronizationFactory != null && resource != null &&
311311
TransactionSynchronizationManager.isActualTransactionActive()) {
312+
312313
TransactionSynchronization synchronization = this.transactionSynchronizationFactory.create(resource);
313-
TransactionSynchronizationManager.registerSynchronization(synchronization);
314-
if (synchronization instanceof IntegrationResourceHolderSynchronization) {
315-
IntegrationResourceHolderSynchronization integrationSynchronization =
316-
((IntegrationResourceHolderSynchronization) synchronization);
317-
integrationSynchronization.setShouldUnbindAtCompletion(false);
318-
IntegrationResourceHolder resourceHolder = integrationSynchronization.getResourceHolder();
314+
if (synchronization != null) {
315+
TransactionSynchronizationManager.registerSynchronization(synchronization);
316+
317+
if (synchronization instanceof IntegrationResourceHolderSynchronization) {
318+
IntegrationResourceHolderSynchronization integrationSynchronization =
319+
((IntegrationResourceHolderSynchronization) synchronization);
320+
integrationSynchronization.setShouldUnbindAtCompletion(false);
321+
322+
if (!TransactionSynchronizationManager.hasResource(resource)) {
323+
TransactionSynchronizationManager.bindResource(resource,
324+
integrationSynchronization.getResourceHolder());
325+
}
326+
}
327+
}
328+
329+
Object resourceHolder = TransactionSynchronizationManager.getResource(resource);
330+
if (resourceHolder instanceof IntegrationResourceHolder) {
331+
IntegrationResourceHolder integrationResourceHolder = (IntegrationResourceHolder) resourceHolder;
319332
if (key != null) {
320-
resourceHolder.addAttribute(key, resource);
333+
integrationResourceHolder.addAttribute(key, resource);
321334
}
322-
return resourceHolder;
335+
return integrationResourceHolder;
323336
}
324337
}
338+
325339
return null;
326340
}
327341

spring-integration-core/src/main/java/org/springframework/integration/transaction/DefaultTransactionSynchronizationFactory.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2017 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,8 +20,8 @@
2020
import org.apache.commons.logging.LogFactory;
2121

2222
import org.springframework.transaction.support.TransactionSynchronization;
23-
import org.springframework.transaction.support.TransactionSynchronizationManager;
2423
import org.springframework.util.Assert;
24+
2525
/**
2626
* Default implementation of {@link TransactionSynchronizationFactory} which takes an instance of
2727
* {@link TransactionSynchronizationProcessor} allowing you to create a {@link TransactionSynchronization}
@@ -30,6 +30,7 @@
3030
* @author Gary Russell
3131
* @author Oleg Zhurakousky
3232
* @author Artem Bilan
33+
*
3334
* @since 2.2
3435
*/
3536
public class DefaultTransactionSynchronizationFactory implements TransactionSynchronizationFactory {
@@ -46,13 +47,9 @@ public DefaultTransactionSynchronizationFactory(TransactionSynchronizationProces
4647
@Override
4748
public TransactionSynchronization create(Object key) {
4849
Assert.notNull(key, "'key' must not be null");
49-
DefaultTransactionalResourceSynchronization synchronization = new DefaultTransactionalResourceSynchronization(key);
50-
TransactionSynchronizationManager.bindResource(key, synchronization.getResourceHolder());
51-
return synchronization;
50+
return new DefaultTransactionalResourceSynchronization(key);
5251
}
5352

54-
/**
55-
*/
5653
private final class DefaultTransactionalResourceSynchronization extends IntegrationResourceHolderSynchronization {
5754

5855
DefaultTransactionalResourceSynchronization(Object resourceKey) {

spring-integration-core/src/main/java/org/springframework/integration/transaction/PassThroughTransactionSynchronizationFactory.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,31 +17,24 @@
1717
package org.springframework.integration.transaction;
1818

1919
import org.springframework.transaction.support.TransactionSynchronization;
20-
import org.springframework.transaction.support.TransactionSynchronizationManager;
2120
import org.springframework.util.Assert;
2221

2322
/**
2423
* A simple {@link TransactionSynchronizationFactory} implementation which produces
25-
* an {@link IntegrationResourceHolderSynchronization} and registers
26-
* an {@link IntegrationResourceHolder} under the provided {@code key} with
27-
* the current transaction scope.
24+
* an {@link IntegrationResourceHolderSynchronization} with an {@link IntegrationResourceHolder}.
2825
*
2926
* @author Andreas Baer
27+
* @author Artem Bilan
3028
*
3129
* @since 5.0
32-
*
33-
* @see TransactionSynchronizationManager#bindResource(Object, Object)
3430
*/
3531
public class PassThroughTransactionSynchronizationFactory implements TransactionSynchronizationFactory {
3632

3733

3834
@Override
3935
public TransactionSynchronization create(Object key) {
4036
Assert.notNull(key, "'key' must not be null");
41-
IntegrationResourceHolderSynchronization synchronization =
42-
new IntegrationResourceHolderSynchronization(new IntegrationResourceHolder(), key);
43-
TransactionSynchronizationManager.bindResource(key, synchronization.getResourceHolder());
44-
return synchronization;
37+
return new IntegrationResourceHolderSynchronization(new IntegrationResourceHolder(), key);
4538
}
4639

4740
}

spring-integration-core/src/test/java/org/springframework/integration/channel/TransactionSynchronizationQueueChannelTests-context.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
</int:channel>
1111

1212
<int:service-activator input-channel="queueChannel" ref="service" method="handle">
13-
<int:poller fixed-delay="5000">
13+
<int:poller max-messages-per-poll="1" fixed-delay="1" receive-timeout="-1">
1414
<int:transactional synchronization-factory="txSyncFactory"/>
1515
</int:poller>
1616
</int:service-activator>
@@ -33,7 +33,7 @@
3333
</int:channel>
3434

3535
<int:service-activator input-channel="queueChannel2" ref="service" method="handle">
36-
<int:poller fixed-delay="5000">
36+
<int:poller max-messages-per-poll="1" fixed-delay="1" receive-timeout="-1">
3737
<int:transactional synchronization-factory="txSyncFactory2"/>
3838
</int:poller>
3939
</int:service-activator>

spring-integration-core/src/test/java/org/springframework/integration/channel/TransactionSynchronizationQueueChannelTests.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2017 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -32,16 +32,20 @@
3232
import org.springframework.messaging.PollableChannel;
3333
import org.springframework.messaging.support.GenericMessage;
3434
import org.springframework.integration.support.MessageBuilder;
35+
import org.springframework.test.annotation.DirtiesContext;
3536
import org.springframework.test.context.ContextConfiguration;
3637
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
3738

3839
/**
3940
* @author Gary Russell
41+
* @author Artem Bilan
42+
*
4043
* @since 2.2
4144
*
4245
*/
4346
@ContextConfiguration
4447
@RunWith(SpringJUnit4ClassRunner.class)
48+
@DirtiesContext
4549
public class TransactionSynchronizationQueueChannelTests {
4650

4751
@Autowired

spring-integration-core/src/test/java/org/springframework/integration/dispatcher/PollingTransactionTests.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@
1616

1717
package org.springframework.integration.dispatcher;
1818

19+
import static org.hamcrest.Matchers.instanceOf;
1920
import static org.junit.Assert.assertEquals;
2021
import static org.junit.Assert.assertNotNull;
2122
import static org.junit.Assert.assertNull;
23+
import static org.junit.Assert.assertThat;
2224
import static org.junit.Assert.assertTrue;
2325

2426
import java.util.List;
@@ -30,8 +32,8 @@
3032
import org.junit.Test;
3133

3234
import org.springframework.aop.Advisor;
35+
import org.springframework.aop.ProxyMethodInvocation;
3336
import org.springframework.aop.framework.Advised;
34-
import org.springframework.aop.support.DefaultPointcutAdvisor;
3537
import org.springframework.context.support.ClassPathXmlApplicationContext;
3638
import org.springframework.integration.endpoint.PollingConsumer;
3739
import org.springframework.integration.test.util.TestUtils;
@@ -53,6 +55,7 @@
5355
* @author Oleg Zhurakousky
5456
* @author Gary Russell
5557
* @author Andreas Baer
58+
* @author Artem Bilan
5659
*/
5760
public class PollingTransactionTests {
5861

@@ -82,25 +85,23 @@ public void transactionWithCommitAndAdvices() throws InterruptedException {
8285
PollingConsumer advicedPoller = context.getBean("advicedSa", PollingConsumer.class);
8386

8487
List<Advice> adviceChain = TestUtils.getPropertyValue(advicedPoller, "adviceChain", List.class);
85-
assertEquals(3, adviceChain.size());
88+
assertEquals(4, adviceChain.size());
8689
Runnable poller = TestUtils.getPropertyValue(advicedPoller, "poller", Runnable.class);
8790
Callable<?> pollingTask = TestUtils.getPropertyValue(poller, "pollingTask", Callable.class);
8891
assertTrue("Poller is not Advised", pollingTask instanceof Advised);
8992
Advisor[] advisors = ((Advised) pollingTask).getAdvisors();
90-
assertEquals(3, advisors.length);
93+
assertEquals(4, advisors.length);
9194

92-
assertTrue("First advisor is not TX", ((DefaultPointcutAdvisor) advisors[0]).getAdvice() instanceof
93-
TransactionInterceptor);
95+
assertThat("First advisor is not TX", advisors[0].getAdvice(), instanceOf(TransactionInterceptor.class));
9496
TestTransactionManager txManager = (TestTransactionManager) context.getBean("txManager");
9597
MessageChannel input = (MessageChannel) context.getBean("goodInputWithAdvice");
9698
PollableChannel output = (PollableChannel) context.getBean("output");
9799
assertEquals(0, txManager.getCommitCount());
98100
assertEquals(0, txManager.getRollbackCount());
99-
input.send(new GenericMessage<String>("test"));
101+
input.send(new GenericMessage<>("test"));
100102
txManager.waitForCompletion(10000);
101103
Message<?> message = output.receive(0);
102104
assertNotNull(message);
103-
assertEquals(1, txManager.getCommitCount());
104105
assertEquals(0, txManager.getRollbackCount());
105106
context.close();
106107
}
@@ -220,7 +221,7 @@ public void commitFailureAndHandlerFailureTest() throws Throwable {
220221
PollableChannel output = (PollableChannel) context.getBean("output");
221222
PollableChannel errorChannel = (PollableChannel) context.getBean("errorChannel");
222223
assertEquals(0, txManager.getCommitCount());
223-
inputTxFail.send(new GenericMessage<>("commitFalilureTest"));
224+
inputTxFail.send(new GenericMessage<>("commitFailureTest"));
224225
Message<?> errorMessage = errorChannel.receive(10000);
225226
assertNotNull(errorMessage);
226227
Object payload = errorMessage.getPayload();
@@ -254,6 +255,17 @@ public Object invoke(MethodInvocation invocation) throws Throwable {
254255

255256
}
256257

258+
public static class SimpleRepeatAdvice implements MethodInterceptor {
259+
260+
@Override
261+
public Object invoke(MethodInvocation invocation) throws Throwable {
262+
((ProxyMethodInvocation) invocation).invocableClone().proceed();
263+
264+
return invocation.proceed();
265+
}
266+
267+
}
268+
257269
@SuppressWarnings("serial")
258270
public static class FailingCommitTransactionManager extends TestTransactionManager {
259271

spring-integration-core/src/test/java/org/springframework/integration/dispatcher/transactionTests.xml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,25 +24,26 @@
2424

2525
<service-activator input-channel="badInput" ref="testBean"
2626
method="bad" output-channel="output">
27-
<poller max-messages-per-poll="1" fixed-rate="10000">
27+
<poller max-messages-per-poll="1" fixed-delay="1" receive-timeout="-1">
2828
<transactional transaction-manager="txManager" />
2929
</poller>
3030
</service-activator>
3131

3232
<service-activator input-channel="goodInput" ref="testBean"
3333
method="good" output-channel="output">
34-
<poller max-messages-per-poll="1" fixed-rate="10000">
34+
<poller max-messages-per-poll="1" fixed-delay="1" receive-timeout="-1">
3535
<transactional transaction-manager="txManager" />
3636
</poller>
3737
</service-activator>
3838

3939
<service-activator id="advicedSa" input-channel="goodInputWithAdvice" ref="testBean"
4040
method="good" output-channel="output">
41-
<poller max-messages-per-poll="1" fixed-rate="10000">
41+
<poller max-messages-per-poll="1" fixed-delay="1" receive-timeout="-1">
4242
<advice-chain>
4343
<ref bean="txAdvise"/>
4444
<ref bean="adviceA" />
4545
<beans:bean class="org.springframework.integration.dispatcher.PollingTransactionTests.SampleAdvice"/>
46+
<beans:bean class="org.springframework.integration.dispatcher.PollingTransactionTests.SimpleRepeatAdvice"/>
4647
</advice-chain>
4748
</poller>
4849
</service-activator>

spring-integration-mail/src/main/java/org/springframework/integration/mail/ImapIdleChannelAdapter.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2017 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -154,7 +154,7 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv
154154

155155
@Override // guarded by super#lifecycleLock
156156
protected void doStart() {
157-
final TaskScheduler scheduler = this.getTaskScheduler();
157+
final TaskScheduler scheduler = this.getTaskScheduler();
158158
Assert.notNull(scheduler, "'taskScheduler' must not be null");
159159
if (this.sendingTaskExecutor == null) {
160160
this.sendingTaskExecutor = Executors.newFixedThreadPool(1);
@@ -187,19 +187,29 @@ private Runnable createMessageSendingTask(final Object mailMessage) {
187187
@SuppressWarnings("unchecked")
188188
org.springframework.messaging.Message<?> message =
189189
mailMessage instanceof Message
190-
? ImapIdleChannelAdapter.this.getMessageBuilderFactory().withPayload(mailMessage).build()
191-
: (org.springframework.messaging.Message<Object>) mailMessage;
190+
? ImapIdleChannelAdapter.this.getMessageBuilderFactory().withPayload(mailMessage).build()
191+
: (org.springframework.messaging.Message<Object>) mailMessage;
192192

193193
if (TransactionSynchronizationManager.isActualTransactionActive()) {
194194
if (ImapIdleChannelAdapter.this.transactionSynchronizationFactory != null) {
195195
TransactionSynchronization synchronization =
196196
ImapIdleChannelAdapter.this.transactionSynchronizationFactory
197197
.create(ImapIdleChannelAdapter.this);
198-
TransactionSynchronizationManager.registerSynchronization(synchronization);
199-
if (synchronization instanceof IntegrationResourceHolderSynchronization) {
200-
IntegrationResourceHolder holder =
201-
((IntegrationResourceHolderSynchronization) synchronization).getResourceHolder();
202-
holder.setMessage(message);
198+
if (synchronization != null) {
199+
TransactionSynchronizationManager.registerSynchronization(synchronization);
200+
201+
if (synchronization instanceof IntegrationResourceHolderSynchronization
202+
&& !TransactionSynchronizationManager.hasResource(ImapIdleChannelAdapter.this)) {
203+
204+
TransactionSynchronizationManager.bindResource(ImapIdleChannelAdapter.this,
205+
((IntegrationResourceHolderSynchronization) synchronization).getResourceHolder());
206+
}
207+
208+
Object resourceHolder =
209+
TransactionSynchronizationManager.getResource(ImapIdleChannelAdapter.this);
210+
if (resourceHolder instanceof IntegrationResourceHolder) {
211+
((IntegrationResourceHolder) resourceHolder).setMessage(message);
212+
}
203213
}
204214
}
205215
}
@@ -262,7 +272,7 @@ private class IdleTask implements Runnable {
262272

263273
@Override
264274
public void run() {
265-
final TaskScheduler scheduler = getTaskScheduler();
275+
final TaskScheduler scheduler = getTaskScheduler();
266276
Assert.notNull(scheduler, "'taskScheduler' must not be null");
267277
/*
268278
* The following shouldn't be necessary because doStart() will have ensured we have

0 commit comments

Comments
 (0)