Skip to content

Commit cb85fe0

Browse files
artembilanspring-builds
authored andcommitted
GH-9825: DelayerEndpointSpec: Set TaskScheduler to the handler as well
Fixes: #9825 Issue link: #9825 The `DelayerEndpointSpec` extends `ConsumerEndpointSpec` which has a `taskScheduler()` option. However this is set only to the endpoint for this `MessageHandler`. * Override `taskScheduler()` method on the `DelayerEndpointSpec` to set the provided `TaskScheduler` to the `DelayHandler` as well (cherry picked from commit 12fee0a)
1 parent 7958d58 commit cb85fe0

File tree

2 files changed

+26
-4
lines changed

2 files changed

+26
-4
lines changed

spring-integration-core/src/main/java/org/springframework/integration/dsl/DelayerEndpointSpec.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.springframework.integration.transaction.TransactionInterceptorBuilder;
3131
import org.springframework.messaging.Message;
3232
import org.springframework.messaging.MessageChannel;
33+
import org.springframework.scheduling.TaskScheduler;
3334
import org.springframework.transaction.TransactionManager;
3435
import org.springframework.transaction.interceptor.TransactionInterceptor;
3536
import org.springframework.util.Assert;
@@ -243,4 +244,16 @@ public DelayerEndpointSpec messageGroupId(String messageGroupId) {
243244
return this;
244245
}
245246

247+
/**
248+
* Set a provided {@link TaskScheduler} into the {@link DelayHandler},
249+
* as well as call {@code super} to set it into an endpoint for this handler (if necessary).
250+
* @param taskScheduler the {@link TaskScheduler} to use.
251+
* @return the spec
252+
*/
253+
@Override
254+
public DelayerEndpointSpec taskScheduler(TaskScheduler taskScheduler) {
255+
this.handler.setTaskScheduler(taskScheduler);
256+
return super.taskScheduler(taskScheduler);
257+
}
258+
246259
}

spring-integration-core/src/test/java/org/springframework/integration/dsl/flows/IntegrationFlowTests.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import org.springframework.integration.endpoint.EventDrivenConsumer;
7373
import org.springframework.integration.gateway.GatewayProxyFactoryBean;
7474
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
75+
import org.springframework.integration.handler.DelayHandler;
7576
import org.springframework.integration.handler.LoggingHandler;
7677
import org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer;
7778
import org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice;
@@ -161,6 +162,10 @@ public class IntegrationFlowTests {
161162
@Qualifier("bridgeFlow2Input")
162163
private MessageChannel bridgeFlow2Input;
163164

165+
@Autowired
166+
@Qualifier("delayer.handler")
167+
DelayHandler delayHandler;
168+
164169
@Autowired
165170
@Qualifier("bridgeFlow2Output")
166171
private PollableChannel bridgeFlow2Output;
@@ -259,6 +264,8 @@ public void testBridge() {
259264
assertThat(reply).isNotNull();
260265
assertThat(reply.getPayload()).isEqualTo("test");
261266
assertThat(this.delayedAdvice.getInvoked()).isTrue();
267+
268+
assertThat(TestUtils.getPropertyValue(this.delayHandler, "taskScheduler")).isSameAs(this.customScheduler);
262269
}
263270

264271
@Test
@@ -812,15 +819,17 @@ public IntegrationFlow bridgeFlow() {
812819
}
813820

814821
@Bean
815-
public IntegrationFlow bridgeFlow2() {
822+
public IntegrationFlow bridgeFlow2(TaskScheduler customScheduler) {
816823
return IntegrationFlow.from("bridgeFlow2Input")
817824
.bridge(c -> c.autoStartup(false).id("bridge"))
818825
.fixedSubscriberChannel()
819826
.delay(d -> d
820827
.messageGroupId("delayer")
821828
.delayExpression("200")
822829
.advice(this.delayedAdvice)
823-
.messageStore(this.messageStore()))
830+
.messageStore(messageStore())
831+
.taskScheduler(customScheduler)
832+
.id("delayer"))
824833
.channel(MessageChannels.queue("bridgeFlow2Output"))
825834
.get();
826835
}
@@ -833,8 +842,8 @@ public SimpleMessageStore messageStore() {
833842
@Bean
834843
public IntegrationFlow claimCheckFlow() {
835844
return IntegrationFlow.from("claimCheckInput")
836-
.claimCheckIn(this.messageStore())
837-
.claimCheckOut(this.messageStore())
845+
.claimCheckIn(messageStore())
846+
.claimCheckOut(messageStore())
838847
.get();
839848
}
840849

0 commit comments

Comments
 (0)