Skip to content

Commit 915f5ad

Browse files
artembilanspring-builds
authored andcommitted
GH-9825: DelayerEndpointSpec: Set TaskScheduler to the handler as well
Fixes: #9825 Issue link: #9825 The `DelayerEndpointSpec` extends `ConsumerEndpointSpec` which has a `taskScheduler()` option. However this is set only to the endpoint for this `MessageHandler`. * Override `taskScheduler()` method on the `DelayerEndpointSpec` to set the provided `TaskScheduler` to the `DelayHandler` as well (cherry picked from commit 12fee0a)
1 parent b17a946 commit 915f5ad

File tree

2 files changed

+26
-4
lines changed

2 files changed

+26
-4
lines changed

spring-integration-core/src/main/java/org/springframework/integration/dsl/DelayerEndpointSpec.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.springframework.integration.transaction.TransactionInterceptorBuilder;
3131
import org.springframework.messaging.Message;
3232
import org.springframework.messaging.MessageChannel;
33+
import org.springframework.scheduling.TaskScheduler;
3334
import org.springframework.transaction.TransactionManager;
3435
import org.springframework.transaction.interceptor.TransactionInterceptor;
3536
import org.springframework.util.Assert;
@@ -243,4 +244,16 @@ public DelayerEndpointSpec messageGroupId(String messageGroupId) {
243244
return this;
244245
}
245246

247+
/**
248+
* Set a provided {@link TaskScheduler} into the {@link DelayHandler},
249+
* as well as call {@code super} to set it into an endpoint for this handler (if necessary).
250+
* @param taskScheduler the {@link TaskScheduler} to use.
251+
* @return the spec
252+
*/
253+
@Override
254+
public DelayerEndpointSpec taskScheduler(TaskScheduler taskScheduler) {
255+
this.handler.setTaskScheduler(taskScheduler);
256+
return super.taskScheduler(taskScheduler);
257+
}
258+
246259
}

spring-integration-core/src/test/java/org/springframework/integration/dsl/flows/IntegrationFlowTests.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import org.springframework.integration.endpoint.EventDrivenConsumer;
7373
import org.springframework.integration.gateway.GatewayProxyFactoryBean;
7474
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
75+
import org.springframework.integration.handler.DelayHandler;
7576
import org.springframework.integration.handler.LoggingHandler;
7677
import org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer;
7778
import org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice;
@@ -163,6 +164,10 @@ public class IntegrationFlowTests {
163164
@Qualifier("bridgeFlow2Input")
164165
private MessageChannel bridgeFlow2Input;
165166

167+
@Autowired
168+
@Qualifier("delayer.handler")
169+
DelayHandler delayHandler;
170+
166171
@Autowired
167172
@Qualifier("bridgeFlow2Output")
168173
private PollableChannel bridgeFlow2Output;
@@ -266,6 +271,8 @@ public void testBridge() {
266271
assertThat(reply).isNotNull();
267272
assertThat(reply.getPayload()).isEqualTo("test");
268273
assertThat(this.delayedAdvice.getInvoked()).isTrue();
274+
275+
assertThat(TestUtils.getPropertyValue(this.delayHandler, "taskScheduler")).isSameAs(this.customScheduler);
269276
}
270277

271278
@Test
@@ -825,15 +832,17 @@ public IntegrationFlow bridgeFlow() {
825832
}
826833

827834
@Bean
828-
public IntegrationFlow bridgeFlow2() {
835+
public IntegrationFlow bridgeFlow2(TaskScheduler customScheduler) {
829836
return IntegrationFlow.from("bridgeFlow2Input")
830837
.bridge(c -> c.autoStartup(false).id("bridge"))
831838
.fixedSubscriberChannel()
832839
.delay(d -> d
833840
.messageGroupId("delayer")
834841
.delayExpression("200")
835842
.advice(this.delayedAdvice)
836-
.messageStore(this.messageStore()))
843+
.messageStore(messageStore())
844+
.taskScheduler(customScheduler)
845+
.id("delayer"))
837846
.channel(MessageChannels.queue("bridgeFlow2Output"))
838847
.get();
839848
}
@@ -846,8 +855,8 @@ public SimpleMessageStore messageStore() {
846855
@Bean
847856
public IntegrationFlow claimCheckFlow() {
848857
return IntegrationFlow.from("claimCheckInput")
849-
.claimCheckIn(this.messageStore())
850-
.claimCheckOut(this.messageStore())
858+
.claimCheckIn(messageStore())
859+
.claimCheckOut(messageStore())
851860
.get();
852861
}
853862

0 commit comments

Comments
 (0)