diff --git a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/config/AmqpInboundGatewayParserTests.java b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/config/AmqpInboundGatewayParserTests.java index ee4ff131b96..d1c1c6a42ca 100644 --- a/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/config/AmqpInboundGatewayParserTests.java +++ b/spring-integration-amqp/src/test/java/org/springframework/integration/amqp/config/AmqpInboundGatewayParserTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,8 +18,7 @@ import java.lang.reflect.Field; -import org.junit.Test; -import org.junit.runner.RunWith; +import org.junit.jupiter.api.Test; import org.mockito.Mockito; import org.springframework.amqp.core.Address; @@ -40,11 +39,11 @@ import org.springframework.integration.test.util.TestUtils; import org.springframework.messaging.MessageChannel; import org.springframework.test.annotation.DirtiesContext; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import org.springframework.util.ReflectionUtils; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.mockito.ArgumentMatchers.isNull; /** @@ -55,8 +54,7 @@ * * @since 2.1 */ -@ContextConfiguration -@RunWith(SpringJUnit4ClassRunner.class) +@SpringJUnitConfig @DirtiesContext public class AmqpInboundGatewayParserTests { @@ -66,16 +64,16 @@ public class AmqpInboundGatewayParserTests { @Test public void customMessageConverter() { Object gateway = context.getBean("gateway"); - MessageConverter gatewayConverter = TestUtils.getPropertyValue(gateway, "amqpMessageConverter", MessageConverter.class); - MessageConverter templateConverter = TestUtils.getPropertyValue(gateway, "amqpTemplate.messageConverter", MessageConverter.class); + MessageConverter gatewayConverter = + TestUtils.getPropertyValue(gateway, "amqpMessageConverter", MessageConverter.class); + MessageConverter templateConverter = + TestUtils.getPropertyValue(gateway, "amqpTemplate.messageConverter", MessageConverter.class); TestConverter testConverter = context.getBean("testConverter", TestConverter.class); assertThat(gatewayConverter).isSameAs(testConverter); assertThat(templateConverter).isSameAs(testConverter); assertThat(TestUtils.getPropertyValue(gateway, "autoStartup")).isEqualTo(Boolean.TRUE); assertThat(TestUtils.getPropertyValue(gateway, "phase")).isEqualTo(0); - assertThat(TestUtils.getPropertyValue(gateway, "replyTimeout", Long.class)).isEqualTo(Long.valueOf(1234L)); - assertThat(TestUtils.getPropertyValue(gateway, "messagingTemplate.receiveTimeout", Long.class)) - .isEqualTo(Long.valueOf(1234L)); + assertThat(TestUtils.getPropertyValue(gateway, "messagingTemplate.receiveTimeout")).isEqualTo(1234L); assertThat(TestUtils.getPropertyValue(gateway, "messageListenerContainer.missingQueuesFatal", Boolean.class)) .isTrue(); } @@ -145,14 +143,12 @@ public void verifyUsageWithHeaderMapper() throws Exception { @Test public void testInt2971HeaderMapperAndMappedHeadersExclusivity() { - try { - new ClassPathXmlApplicationContext("AmqpInboundGatewayParserTests-headerMapper-fail-context.xml", - this.getClass()).close(); - } - catch (BeanDefinitionParsingException e) { - assertThat(e.getMessage().startsWith("Configuration problem: The 'header-mapper' attribute " + - "is mutually exclusive with 'mapped-request-headers' or 'mapped-reply-headers'")).isTrue(); - } + assertThatExceptionOfType(BeanDefinitionParsingException.class) + .isThrownBy(() -> + new ClassPathXmlApplicationContext("AmqpInboundGatewayParserTests-headerMapper-fail-context.xml", + getClass())) + .withMessageStartingWith("Configuration problem: The 'header-mapper' attribute " + + "is mutually exclusive with 'mapped-request-headers' or 'mapped-reply-headers'"); } private static class TestConverter extends SimpleMessageConverter { diff --git a/spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationProperties.java b/spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationProperties.java index 8cf47678aa7..6af3f6de76c 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationProperties.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/context/IntegrationProperties.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2022 the original author or authors. + * Copyright 2014-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,6 +37,7 @@ *
  • {@code spring.integration.endpoints.noAutoStartup=} *
  • {@code spring.integration.channels.error.requireSubscribers=true} *
  • {@code spring.integration.channels.error.ignoreFailures=true} + *
  • {@code spring.integration.endpoints.defaultTimeout=30000} * * * @author Artem Bilan @@ -112,6 +113,12 @@ public final class IntegrationProperties { */ public static final String ENDPOINTS_NO_AUTO_STARTUP = INTEGRATION_PROPERTIES_PREFIX + "endpoints.noAutoStartup"; + /** + * Specifies the default timeout for blocking operations like send and receive messages. + * @since 6.2 + */ + public static final String ENDPOINTS_DEFAULT_TIMEOUT = INTEGRATION_PROPERTIES_PREFIX + "endpoints.defaultTimeout"; + private static final Properties DEFAULTS; private boolean channelsAutoCreate = true; @@ -132,6 +139,8 @@ public final class IntegrationProperties { private String[] noAutoStartupEndpoints = {}; + private long endpointsDefaultTimeout = IntegrationContextUtils.DEFAULT_TIMEOUT; + private volatile Properties properties; static { @@ -293,6 +302,23 @@ public String[] getNoAutoStartupEndpoints() { return Arrays.copyOf(this.noAutoStartupEndpoints, this.noAutoStartupEndpoints.length); } + /** + * Return the value of {@link #ENDPOINTS_DEFAULT_TIMEOUT} option. + * @return the value of {@link #ENDPOINTS_DEFAULT_TIMEOUT} option. + * @since 6.2 + */ + public long getEndpointsDefaultTimeout() { + return this.endpointsDefaultTimeout; + } + + /** + * Configure a value for {@link #ENDPOINTS_DEFAULT_TIMEOUT} option. + * @param endpointsDefaultTimeout the value for {@link #ENDPOINTS_DEFAULT_TIMEOUT} option. + */ + public void setEndpointsDefaultTimeout(long endpointsDefaultTimeout) { + this.endpointsDefaultTimeout = endpointsDefaultTimeout; + } + /** * Represent the current instance as a {@link Properties}. * @return the {@link Properties} representation. @@ -312,6 +338,7 @@ public Properties toProperties() { props.setProperty(READ_ONLY_HEADERS, StringUtils.arrayToCommaDelimitedString(this.readOnlyHeaders)); props.setProperty(ENDPOINTS_NO_AUTO_STARTUP, StringUtils.arrayToCommaDelimitedString(this.noAutoStartupEndpoints)); + props.setProperty(ENDPOINTS_DEFAULT_TIMEOUT, "" + this.endpointsDefaultTimeout); this.properties = props; } @@ -348,7 +375,9 @@ public static IntegrationProperties parse(Properties properties) { StringUtils.commaDelimitedListToStringArray(value))) .acceptIfHasText(properties.getProperty(ENDPOINTS_NO_AUTO_STARTUP), (value) -> integrationProperties.setNoAutoStartupEndpoints( - StringUtils.commaDelimitedListToStringArray(value))); + StringUtils.commaDelimitedListToStringArray(value))) + .acceptIfHasText(properties.getProperty(ENDPOINTS_DEFAULT_TIMEOUT), + (value) -> integrationProperties.setEndpointsDefaultTimeout(Long.parseLong(value))); return integrationProperties; } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java b/spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java index 98c4f9e86cf..dfd358fc23f 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java @@ -35,7 +35,6 @@ import org.springframework.integration.IntegrationPatternType; import org.springframework.integration.MessageTimeoutException; import org.springframework.integration.channel.ReactiveStreamsSubscribableChannel; -import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.integration.core.MessagingTemplate; import org.springframework.integration.endpoint.AbstractEndpoint; import org.springframework.integration.endpoint.EventDrivenConsumer; @@ -124,7 +123,9 @@ public abstract class MessagingGatewaySupport extends AbstractEndpoint private String errorChannelName; - private long replyTimeout = IntegrationContextUtils.DEFAULT_TIMEOUT; + private boolean requestTimeoutSet; + + private boolean replyTimeoutSet; private InboundMessageMapper requestMapper = new DefaultRequestMapper(); @@ -167,8 +168,6 @@ public MessagingGatewaySupport() { public MessagingGatewaySupport(boolean errorOnTimeout) { ConvertingMessagingTemplate template = new ConvertingMessagingTemplate(); template.setMessageConverter(this.messageConverter); - template.setSendTimeout(IntegrationContextUtils.DEFAULT_TIMEOUT); - template.setReceiveTimeout(this.replyTimeout); this.messagingTemplate = template; this.errorOnTimeout = errorOnTimeout; } @@ -252,6 +251,7 @@ public void setErrorChannelName(String errorChannelName) { */ public void setRequestTimeout(long requestTimeout) { this.messagingTemplate.setSendTimeout(requestTimeout); + this.requestTimeoutSet = true; } /** @@ -260,8 +260,8 @@ public void setRequestTimeout(long requestTimeout) { * @param replyTimeout the timeout value in milliseconds */ public void setReplyTimeout(long replyTimeout) { - this.replyTimeout = replyTimeout; this.messagingTemplate.setReceiveTimeout(replyTimeout); + this.replyTimeoutSet = true; } /** @@ -406,6 +406,13 @@ protected void onInit() { } this.messageConverter.setBeanFactory(beanFactory); } + long endpointsDefaultTimeout = getIntegrationProperties().getEndpointsDefaultTimeout(); + if (!this.requestTimeoutSet) { + this.messagingTemplate.setSendTimeout(endpointsDefaultTimeout); + } + if (!this.replyTimeoutSet) { + this.messagingTemplate.setReceiveTimeout(endpointsDefaultTimeout); + } this.initialized = true; } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java b/spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java index 20f139c1215..6bf0d3432ed 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java @@ -89,9 +89,7 @@ public abstract class AbstractMessageProducingHandler extends AbstractMessageHan private boolean noHeadersPropagation; - { - this.messagingTemplate.setSendTimeout(IntegrationContextUtils.DEFAULT_TIMEOUT); - } + private boolean sendTimeoutSet; /** * Set the timeout for sending reply Messages. @@ -99,6 +97,7 @@ public abstract class AbstractMessageProducingHandler extends AbstractMessageHan */ public void setSendTimeout(long sendTimeout) { this.messagingTemplate.setSendTimeout(sendTimeout); + this.sendTimeoutSet = true; } @Override @@ -189,7 +188,7 @@ protected final void updateNotPropagatedHeaders(String[] headers, boolean merge) @Override public Collection getNotPropagatedHeaders() { return this.notPropagatedHeaders != null - ? Collections.unmodifiableSet(new HashSet<>(Arrays.asList(this.notPropagatedHeaders))) + ? Set.of(this.notPropagatedHeaders) : Collections.emptyList(); } @@ -217,6 +216,9 @@ protected void onInit() { } this.messagingTemplate.setDestinationResolver(getChannelResolver()); setAsyncIfCan(); + if (!this.sendTimeoutSet) { + this.messagingTemplate.setSendTimeout(getIntegrationProperties().getEndpointsDefaultTimeout()); + } } private void setAsyncIfCan() { diff --git a/spring-integration-core/src/main/java/org/springframework/integration/router/AbstractMessageRouter.java b/spring-integration-core/src/main/java/org/springframework/integration/router/AbstractMessageRouter.java index 2a89d13dabb..a174708c957 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/router/AbstractMessageRouter.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/router/AbstractMessageRouter.java @@ -25,7 +25,6 @@ import org.springframework.core.convert.ConversionService; import org.springframework.core.convert.support.DefaultConversionService; import org.springframework.integration.IntegrationPatternType; -import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.integration.core.MessagingTemplate; import org.springframework.integration.handler.AbstractMessageHandler; import org.springframework.integration.support.management.IntegrationManagedResource; @@ -63,9 +62,7 @@ public abstract class AbstractMessageRouter extends AbstractMessageHandler imple private volatile boolean applySequence; - { - this.messagingTemplate.setSendTimeout(IntegrationContextUtils.DEFAULT_TIMEOUT); - } + private boolean sendTimeoutSet; /** * Set the default channel where Messages should be sent if channel resolution @@ -115,10 +112,11 @@ public void setDefaultOutputChannelName(String defaultOutputChannelName) { */ public void setSendTimeout(long timeout) { this.messagingTemplate.setSendTimeout(timeout); + this.sendTimeoutSet = true; } /** - * Specify whether send failures for one or more of the recipients should be ignored. By default this is + * Specify whether send failures for one or more of the recipients should be ignored. By default, this is * false meaning that an Exception will be thrown whenever a send fails. To override this and suppress * Exceptions, set the value to true. * @param ignoreSendFailures true to ignore send failures. @@ -174,6 +172,10 @@ protected void onInit() { if (beanFactory != null) { this.messagingTemplate.setBeanFactory(beanFactory); } + + if (!this.sendTimeoutSet) { + this.messagingTemplate.setSendTimeout(getIntegrationProperties().getEndpointsDefaultTimeout()); + } } /** diff --git a/spring-integration-core/src/main/java/org/springframework/integration/scattergather/ScatterGatherHandler.java b/spring-integration-core/src/main/java/org/springframework/integration/scattergather/ScatterGatherHandler.java index e565eabc273..35d17eec846 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/scattergather/ScatterGatherHandler.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/scattergather/ScatterGatherHandler.java @@ -67,7 +67,7 @@ public class ScatterGatherHandler extends AbstractReplyProducingMessageHandler i private String errorChannelName = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME; - private long gatherTimeout = IntegrationContextUtils.DEFAULT_TIMEOUT; + private Long gatherTimeout; private AbstractEndpoint gatherEndpoint; @@ -119,6 +119,10 @@ public IntegrationPatternType getIntegrationPatternType() { @Override protected void doInit() { + if (this.gatherTimeout == null) { + this.gatherTimeout = getIntegrationProperties().getEndpointsDefaultTimeout(); + } + BeanFactory beanFactory = getBeanFactory(); if (this.gatherChannel == null) { this.gatherChannel = diff --git a/spring-integration-core/src/main/resources/META-INF/spring.integration.default.properties b/spring-integration-core/src/main/resources/META-INF/spring.integration.default.properties index 23243f2f49e..573f1916e63 100644 --- a/spring-integration-core/src/main/resources/META-INF/spring.integration.default.properties +++ b/spring-integration-core/src/main/resources/META-INF/spring.integration.default.properties @@ -8,3 +8,4 @@ spring.integration.messagingTemplate.throwExceptionOnLateReply=false # Defaults to MessageHeaders.ID and MessageHeaders.TIMESTAMP spring.integration.readOnly.headers= spring.integration.endpoints.noAutoStartup= +spring.integration.endpoints.defaultTimeout=30000 diff --git a/spring-integration-core/src/test/java/org/springframework/integration/aggregator/CorrelatingMessageHandlerTests.java b/spring-integration-core/src/test/java/org/springframework/integration/aggregator/CorrelatingMessageHandlerTests.java index db5f376e99e..e83d9702cbd 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/aggregator/CorrelatingMessageHandlerTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/aggregator/CorrelatingMessageHandlerTests.java @@ -71,9 +71,10 @@ public void initializeSubject() { outputChannel = mock(MessageChannel.class); handler = new AggregatingMessageHandler(processor, store, correlationStrategy, ReleaseStrategy); handler.setOutputChannel(outputChannel); + handler.setBeanFactory(mock()); + handler.afterPropertiesSet(); } - @Test public void bufferCompletesNormally() { String correlationKey = "key"; @@ -95,7 +96,7 @@ public void bufferCompletesNormally() { } @Test - public void bufferCompletesWithException() throws Exception { + public void bufferCompletesWithException() { doAnswer(new ThrowsException(new RuntimeException("Planned test exception"))) .when(processor).processMessageGroup(isA(SimpleMessageGroup.class)); diff --git a/spring-integration-core/src/test/java/org/springframework/integration/config/ResequencerParserTests.java b/spring-integration-core/src/test/java/org/springframework/integration/config/ResequencerParserTests.java index 16e9cb6a226..429e141b85b 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/config/ResequencerParserTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/config/ResequencerParserTests.java @@ -59,7 +59,7 @@ void testDefaultResequencerProperties() { ResequencingMessageHandler resequencer = TestUtils.getPropertyValue(endpoint, "handler", ResequencingMessageHandler.class); assertThat(getPropertyValue(resequencer, "outputChannel")).isNull(); - assertThat(getPropertyValue(resequencer, "messagingTemplate.sendTimeout")).isEqualTo(30000L); + assertThat(getPropertyValue(resequencer, "messagingTemplate.sendTimeout")).isEqualTo(45000L); assertThat(getPropertyValue(resequencer, "sendPartialResultOnExpiry")) .as("The ResequencerEndpoint is not configured with the appropriate 'send partial results on " + "timeout'" + diff --git a/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/AggregatorAnnotationTests.java b/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/AggregatorAnnotationTests.java index 3493d977906..04bb0410de8 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/AggregatorAnnotationTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/AggregatorAnnotationTests.java @@ -51,7 +51,7 @@ public void testAnnotationWithDefaultSettings() { assertThat(getPropertyValue(aggregator, "releaseStrategy") instanceof SimpleSequenceSizeReleaseStrategy) .isTrue(); assertThat(getPropertyValue(aggregator, "outputChannel")).isNull(); - assertThat(getPropertyValue(aggregator, "messagingTemplate.sendTimeout")).isEqualTo(30000L); + assertThat(getPropertyValue(aggregator, "messagingTemplate.sendTimeout")).isEqualTo(45000L); assertThat(getPropertyValue(aggregator, "sendPartialResultOnExpiry")).isEqualTo(false); context.close(); } @@ -72,7 +72,7 @@ public void testAnnotationWithCustomSettings() { } @Test - public void testAnnotationWithCustomReleaseStrategy() throws Exception { + public void testAnnotationWithCustomReleaseStrategy() { ConfigurableApplicationContext context = new ClassPathXmlApplicationContext( new String[] {"classpath:/org/springframework/integration/config/annotation/testAnnotatedAggregator.xml"}); final String endpointName = "endpointWithDefaultAnnotationAndCustomReleaseStrategy"; @@ -90,7 +90,7 @@ public void testAnnotationWithCustomReleaseStrategy() throws Exception { } @Test - public void testAnnotationWithCustomCorrelationStrategy() throws Exception { + public void testAnnotationWithCustomCorrelationStrategy() { ConfigurableApplicationContext context = new ClassPathXmlApplicationContext( new String[] {"classpath:/org/springframework/integration/config/annotation/testAnnotatedAggregator.xml"}); final String endpointName = "endpointWithCorrelationStrategy"; diff --git a/spring-integration-core/src/test/java/org/springframework/integration/config/xml/HeaderEnricherParserTests.java b/spring-integration-core/src/test/java/org/springframework/integration/config/xml/HeaderEnricherParserTests.java index 07b1380be75..0b0fa1e157c 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/config/xml/HeaderEnricherParserTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/config/xml/HeaderEnricherParserTests.java @@ -49,7 +49,7 @@ class HeaderEnricherParserTests { void sendTimeoutDefault() { Object endpoint = context.getBean("headerEnricherWithDefaults"); long sendTimeout = TestUtils.getPropertyValue(endpoint, "handler.messagingTemplate.sendTimeout", Long.class); - assertThat(sendTimeout).isEqualTo(30000L); + assertThat(sendTimeout).isEqualTo(45000L); } @Test diff --git a/spring-integration-core/src/test/java/org/springframework/integration/configuration/EnableIntegrationTests.java b/spring-integration-core/src/test/java/org/springframework/integration/configuration/EnableIntegrationTests.java index e56b3b6fd75..0af3d08404a 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/configuration/EnableIntegrationTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/configuration/EnableIntegrationTests.java @@ -640,7 +640,7 @@ public void testMetaAnnotations() { assertThat(TestUtils.getPropertyValue(consumer, "handler.outputChannelName")).isEqualTo("annOutput"); assertThat(TestUtils.getPropertyValue(consumer, "handler.discardChannelName")).isEqualTo("annOutput"); assertThat(TestUtils.getPropertyValue(consumer, "trigger.period")).isEqualTo(Duration.ofSeconds(1)); - assertThat(TestUtils.getPropertyValue(consumer, "handler.messagingTemplate.sendTimeout")).isEqualTo(30000L); + assertThat(TestUtils.getPropertyValue(consumer, "handler.messagingTemplate.sendTimeout")).isEqualTo(45000L); assertThat(TestUtils.getPropertyValue(consumer, "handler.sendPartialResultOnExpiry", Boolean.class)).isFalse(); consumer = this.context.getBean("annotationTestService.annAgg2.aggregator", PollingConsumer.class); diff --git a/spring-integration-core/src/test/java/org/springframework/integration/gateway/GatewayXmlAndAnnotationTests.java b/spring-integration-core/src/test/java/org/springframework/integration/gateway/GatewayXmlAndAnnotationTests.java index 1dde2909674..527e51965d9 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/gateway/GatewayXmlAndAnnotationTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/gateway/GatewayXmlAndAnnotationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -58,22 +58,22 @@ public void test() { switch (entry.getKey().getName()) { case "annotationShouldNotOverrideDefault" -> { assertThat(TestUtils.getPropertyValue(entry.getValue(), - "replyTimeout")).isEqualTo(123L); + "messagingTemplate.receiveTimeout")).isEqualTo(123L); assertions++; } case "annotationShouldOverrideDefault" -> { assertThat(TestUtils.getPropertyValue(entry.getValue(), - "replyTimeout")).isEqualTo(234L); + "messagingTemplate.receiveTimeout")).isEqualTo(234L); assertions++; } case "annotationShouldOverrideDefaultToInfinity" -> { assertThat(TestUtils.getPropertyValue(entry.getValue(), - "replyTimeout")).isEqualTo(-1L); + "messagingTemplate.receiveTimeout")).isEqualTo(-1L); assertions++; } case "explicitTimeoutShouldOverrideDefault" -> { assertThat(TestUtils.getPropertyValue(entry.getValue(), - "replyTimeout")).isEqualTo(456L); + "messagingTemplate.receiveTimeout")).isEqualTo(456L); assertions++; } } diff --git a/spring-integration-core/src/test/java/org/springframework/integration/handler/AbstractReplyProducingMessageHandlerTests.java b/spring-integration-core/src/test/java/org/springframework/integration/handler/AbstractReplyProducingMessageHandlerTests.java index 25604ab17d7..7e3fb1eb74e 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/handler/AbstractReplyProducingMessageHandlerTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/handler/AbstractReplyProducingMessageHandlerTests.java @@ -61,6 +61,8 @@ protected Object handleRequestMessage(Message requestMessage) { @BeforeEach void setup() { channel = mock(MessageChannel.class); + handler.setBeanFactory(mock()); + handler.afterPropertiesSet(); } @Test @@ -91,6 +93,9 @@ protected Object handleRequestMessage(Message requestMessage) { assertThat(handler.getNotPropagatedHeaders()).isEmpty(); handler.setNotPropagatedHeaders("f*", "*r"); handler.setOutputChannel(this.channel); + handler.setBeanFactory(mock()); + handler.afterPropertiesSet(); + assertThat(handler.getNotPropagatedHeaders()).contains("f*", "*r"); ArgumentCaptor> captor = ArgumentCaptor.forClass(Message.class); willReturn(true).given(this.channel).send(captor.capture(), eq(30000L)); @@ -120,6 +125,9 @@ protected Object handleRequestMessage(Message requestMessage) { handler.addNotPropagatedHeaders("boom"); assertThat(handler.getNotPropagatedHeaders()).contains("boom"); handler.setOutputChannel(this.channel); + handler.setBeanFactory(mock()); + handler.afterPropertiesSet(); + ArgumentCaptor> captor = ArgumentCaptor.forClass(Message.class); willReturn(true).given(this.channel).send(captor.capture(), eq(30000L)); handler.handleMessage(MessageBuilder.withPayload("hello") @@ -149,6 +157,9 @@ protected Object handleRequestMessage(Message requestMessage) { handler.setNotPropagatedHeaders("foo"); handler.addNotPropagatedHeaders("b*r"); handler.setOutputChannel(this.channel); + handler.setBeanFactory(mock()); + handler.afterPropertiesSet(); + assertThat(handler.getNotPropagatedHeaders()).contains("foo", "b*r"); ArgumentCaptor> captor = ArgumentCaptor.forClass(Message.class); willReturn(true).given(this.channel).send(captor.capture(), eq(30000L)); diff --git a/spring-integration-core/src/test/java/org/springframework/integration/router/config/RecipientListRouterParserTests.java b/spring-integration-core/src/test/java/org/springframework/integration/router/config/RecipientListRouterParserTests.java index c232d68c0be..c5c85292ed1 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/router/config/RecipientListRouterParserTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/router/config/RecipientListRouterParserTests.java @@ -73,7 +73,7 @@ public void simpleRouter() { assertThat(handler.getClass()).isEqualTo(RecipientListRouter.class); RecipientListRouter router = (RecipientListRouter) handler; DirectFieldAccessor accessor = new DirectFieldAccessor(router); - assertThat(TestUtils.getPropertyValue(router, "messagingTemplate.sendTimeout")).isEqualTo(30000L); + assertThat(TestUtils.getPropertyValue(router, "messagingTemplate.sendTimeout")).isEqualTo(45000L); assertThat(accessor.getPropertyValue("applySequence")).isEqualTo(Boolean.FALSE); assertThat(accessor.getPropertyValue("ignoreSendFailures")).isEqualTo(Boolean.FALSE); } diff --git a/spring-integration-core/src/test/resources/META-INF/spring.integration.properties b/spring-integration-core/src/test/resources/META-INF/spring.integration.properties index 3f18726a704..0298eb27188 100644 --- a/spring-integration-core/src/test/resources/META-INF/spring.integration.properties +++ b/spring-integration-core/src/test/resources/META-INF/spring.integration.properties @@ -4,3 +4,4 @@ spring.integration.taskScheduler.poolSize=20 spring.integration.messagingTemplate.throwExceptionOnLateReply=true spring.integration.endpoints.noAutoStartup=fooService*,stringSupplierEndpoint +spring.integration.endpoints.defaultTimeout=45000 diff --git a/spring-integration-ip/src/test/java/org/springframework/integration/ip/config/ParserUnitTests.java b/spring-integration-ip/src/test/java/org/springframework/integration/ip/config/ParserUnitTests.java index 14288841149..8f106d27bc6 100644 --- a/spring-integration-ip/src/test/java/org/springframework/integration/ip/config/ParserUnitTests.java +++ b/spring-integration-ip/src/test/java/org/springframework/integration/ip/config/ParserUnitTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -447,7 +447,7 @@ public void testOutTcp() { public void testInGateway1() { DirectFieldAccessor dfa = new DirectFieldAccessor(tcpInboundGateway1); assertThat(dfa.getPropertyValue("serverConnectionFactory")).isSameAs(cfS2); - assertThat(dfa.getPropertyValue("replyTimeout")).isEqualTo(456L); + assertThat(TestUtils.getPropertyValue(tcpInboundGateway1, "messagingTemplate.receiveTimeout")).isEqualTo(456L); assertThat(tcpInboundGateway1.getComponentName()).isEqualTo("inGateway1"); assertThat(tcpInboundGateway1.getComponentType()).isEqualTo("ip:tcp-inbound-gateway"); assertThat(tcpInboundGateway1.getErrorChannel()).isEqualTo(errorChannel); @@ -463,7 +463,7 @@ public void testInGateway1() { public void testInGateway2() { DirectFieldAccessor dfa = new DirectFieldAccessor(tcpInboundGateway2); assertThat(dfa.getPropertyValue("serverConnectionFactory")).isSameAs(cfS3); - assertThat(dfa.getPropertyValue("replyTimeout")).isEqualTo(456L); + assertThat(TestUtils.getPropertyValue(tcpInboundGateway2, "messagingTemplate.receiveTimeout")).isEqualTo(456L); assertThat(tcpInboundGateway2.getComponentName()).isEqualTo("inGateway2"); assertThat(tcpInboundGateway2.getComponentType()).isEqualTo("ip:tcp-inbound-gateway"); assertThat(dfa.getPropertyValue("errorChannel")).isNull(); diff --git a/spring-integration-jpa/src/main/java/org/springframework/integration/jpa/outbound/JpaOutboundGatewayFactoryBean.java b/spring-integration-jpa/src/main/java/org/springframework/integration/jpa/outbound/JpaOutboundGatewayFactoryBean.java index 11f546201f8..6f768d2b505 100644 --- a/spring-integration-jpa/src/main/java/org/springframework/integration/jpa/outbound/JpaOutboundGatewayFactoryBean.java +++ b/spring-integration-jpa/src/main/java/org/springframework/integration/jpa/outbound/JpaOutboundGatewayFactoryBean.java @@ -17,9 +17,9 @@ package org.springframework.integration.jpa.outbound; import org.springframework.integration.config.AbstractSimpleMessageHandlerFactoryBean; -import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.integration.jpa.core.JpaExecutor; import org.springframework.integration.jpa.support.OutboundGatewayType; +import org.springframework.lang.Nullable; /** * The {@link JpaOutboundGatewayFactoryBean} creates instances of the @@ -44,7 +44,8 @@ public class JpaOutboundGatewayFactoryBean extends AbstractSimpleMessageHandlerF private boolean producesReply = true; - private long replyTimeout = IntegrationContextUtils.DEFAULT_TIMEOUT; + @Nullable + private Long replyTimeout; private boolean requiresReply = false; @@ -79,7 +80,9 @@ protected JpaOutboundGateway createHandler() { JpaOutboundGateway jpaOutboundGateway = new JpaOutboundGateway(this.jpaExecutor); jpaOutboundGateway.setGatewayType(this.gatewayType); jpaOutboundGateway.setProducesReply(this.producesReply); - jpaOutboundGateway.setSendTimeout(this.replyTimeout); + if (this.replyTimeout != null) { + jpaOutboundGateway.setSendTimeout(this.replyTimeout); + } jpaOutboundGateway.setRequiresReply(this.requiresReply); return jpaOutboundGateway; } diff --git a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/config/xml/KafkaInboundGatewayTests.java b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/config/xml/KafkaInboundGatewayTests.java index ff885b5f043..da03ee622ac 100644 --- a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/config/xml/KafkaInboundGatewayTests.java +++ b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/config/xml/KafkaInboundGatewayTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2022 the original author or authors. + * Copyright 2019-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -67,7 +67,7 @@ public void testProps() { assertThat(TestUtils.getPropertyValue(this.gateway1, "onPartitionsAssignedSeekCallback")) .isSameAs(this.context.getBean("onPartitionsAssignedSeekCallback")); assertThat(TestUtils.getPropertyValue(this.gateway1, "messagingTemplate.sendTimeout")).isEqualTo(5000L); - assertThat(TestUtils.getPropertyValue(this.gateway1, "replyTimeout")).isEqualTo(43L); + assertThat(TestUtils.getPropertyValue(this.gateway1, "messagingTemplate.receiveTimeout")).isEqualTo(43L); assertThat(TestUtils.getPropertyValue(this.gateway1, "bindSourceRecord", Boolean.class)).isTrue(); } diff --git a/spring-integration-redis/src/test/java/org/springframework/integration/redis/config/RedisQueueInboundGatewayParserTests.java b/spring-integration-redis/src/test/java/org/springframework/integration/redis/config/RedisQueueInboundGatewayParserTests.java index ac6e65adc66..ee95a08c6a6 100644 --- a/spring-integration-redis/src/test/java/org/springframework/integration/redis/config/RedisQueueInboundGatewayParserTests.java +++ b/spring-integration-redis/src/test/java/org/springframework/integration/redis/config/RedisQueueInboundGatewayParserTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2022 the original author or authors. + * Copyright 2014-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,8 +16,8 @@ package org.springframework.integration.redis.config; -import org.junit.Test; -import org.junit.runner.RunWith; + +import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; @@ -26,8 +26,7 @@ import org.springframework.integration.test.util.TestUtils; import org.springframework.messaging.MessageChannel; import org.springframework.test.annotation.DirtiesContext; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import static org.assertj.core.api.Assertions.assertThat; @@ -39,8 +38,7 @@ * * since 4.1 */ -@ContextConfiguration -@RunWith(SpringJUnit4ClassRunner.class) +@SpringJUnitConfig @DirtiesContext public class RedisQueueInboundGatewayParserTests { @@ -65,20 +63,20 @@ public class RedisQueueInboundGatewayParserTests { @Test - public void testDefaultConfig() throws Exception { + public void testDefaultConfig() { assertThat(TestUtils.getPropertyValue(this.defaultGateway, "extractPayload", Boolean.class)).isFalse(); assertThat(TestUtils.getPropertyValue(this.defaultGateway, "serializer")).isSameAs(this.serializer); assertThat(TestUtils.getPropertyValue(this.defaultGateway, "serializerExplicitlySet", Boolean.class)).isTrue(); assertThat(this.defaultGateway.getReplyChannel()).isSameAs(this.receiveChannel); assertThat(this.defaultGateway.getRequestChannel()).isSameAs(this.requestChannel); - assertThat(TestUtils.getPropertyValue(this.defaultGateway, "replyTimeout")).isEqualTo(2000L); + assertThat(TestUtils.getPropertyValue(this.defaultGateway, "messagingTemplate.receiveTimeout")).isEqualTo(2000L); assertThat(TestUtils.getPropertyValue(this.defaultGateway, "taskExecutor")).isNotNull(); assertThat(TestUtils.getPropertyValue(this.defaultGateway, "autoStartup", Boolean.class)).isFalse(); assertThat(TestUtils.getPropertyValue(this.defaultGateway, "phase")).isEqualTo(3); } @Test - public void testZeroReceiveTimeoutConfig() throws Exception { + public void testZeroReceiveTimeoutConfig() { assertThat(TestUtils.getPropertyValue(this.zeroReceiveTimeoutGateway, "receiveTimeout")).isEqualTo(0L); } diff --git a/spring-integration-ws/src/test/java/org/springframework/integration/ws/config/WebServiceInboundGatewayParserTests.java b/spring-integration-ws/src/test/java/org/springframework/integration/ws/config/WebServiceInboundGatewayParserTests.java index 217434b75de..5041cb3ba2a 100644 --- a/spring-integration-ws/src/test/java/org/springframework/integration/ws/config/WebServiceInboundGatewayParserTests.java +++ b/spring-integration-ws/src/test/java/org/springframework/integration/ws/config/WebServiceInboundGatewayParserTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,8 +20,7 @@ import java.util.Map; import java.util.Properties; -import org.junit.Test; -import org.junit.runner.RunWith; +import org.junit.jupiter.api.Test; import org.mockito.Mockito; import org.springframework.beans.DirectFieldAccessor; @@ -39,8 +38,7 @@ import org.springframework.messaging.PollableChannel; import org.springframework.oxm.Unmarshaller; import org.springframework.test.annotation.DirtiesContext; -import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import org.springframework.ws.context.DefaultMessageContext; import org.springframework.ws.context.MessageContext; import org.springframework.ws.soap.SoapMessage; @@ -57,8 +55,7 @@ * @author Stephane Nicoll * @author Artem Bilan */ -@RunWith(SpringJUnit4ClassRunner.class) -@ContextConfiguration +@SpringJUnitConfig @DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD) public class WebServiceInboundGatewayParserTests { @@ -179,9 +176,7 @@ public void testHeaderMapperReference() { @Test public void testReplyTimeout() { - DirectFieldAccessor accessor = new DirectFieldAccessor(replyTimeoutGateway); - Object replyTimeout = accessor.getPropertyValue("replyTimeout"); - assertThat(replyTimeout).isEqualTo(1234L); + assertThat(TestUtils.getPropertyValue(replyTimeoutGateway, "messagingTemplate.receiveTimeout")).isEqualTo(1234L); } @@ -189,8 +184,7 @@ public void testReplyTimeout() { private static class TestHeaderMapper implements SoapHeaderMapper { @Override - public void fromHeadersToRequest(MessageHeaders headers, - SoapMessage target) { + public void fromHeadersToRequest(MessageHeaders headers, SoapMessage target) { } @Override diff --git a/src/reference/antora/modules/ROOT/pages/configuration/global-properties.adoc b/src/reference/antora/modules/ROOT/pages/configuration/global-properties.adoc index 33271e3565c..acae7aa00a9 100644 --- a/src/reference/antora/modules/ROOT/pages/configuration/global-properties.adoc +++ b/src/reference/antora/modules/ROOT/pages/configuration/global-properties.adoc @@ -6,6 +6,7 @@ Certain global framework properties can be overridden by providing a properties The default properties can be found in `org.springframework.integration.context.IntegrationProperties` class. The following listing shows the default values: +==== [source] ---- spring.integration.channels.autoCreate=true <1> @@ -17,6 +18,7 @@ spring.integration.readOnly.headers= <6> spring.integration.endpoints.noAutoStartup= <7> spring.integration.channels.error.requireSubscribers=true <8> spring.integration.channels.error.ignoreFailures=true <9> +spring.integration.endpoints.defaultTimeout=30000 <10> ---- <1> When true, `input-channel` instances are automatically declared as `DirectChannel` instances when not explicitly found in the application context. @@ -51,11 +53,18 @@ See xref:scatter-gather.adoc#scatter-gather-error-handling[Error Handling] for m <9> A boolean flag to indicate that default global `errorChannel` must ignore dispatching errors and pass the message to the next handler. Since version 5.5. +<10> The default number of milliseconds for request and reply timeouts in endpoints. +Default value is 30 seconds to avoid indefinite blocking. +Can be configured to a negative value to restore infinite blocking behavior in endpoints. +Since version 6.2. +==== + These properties can be overridden by adding a `/META-INF/spring.integration.properties` file to the classpath or an `IntegrationContextUtils.INTEGRATION_GLOBAL_PROPERTIES_BEAN_NAME` bean for the `org.springframework.integration.context.IntegrationProperties` instance. You need not provide all the properties -- only those that you want to override. Starting with version 5.1, all the merged global properties are printed in the logs after application context startup when a `DEBUG` logic level is turned on for the `org.springframework.integration` category. The output looks like this: + [source] ---- Spring Integration global properties: @@ -67,5 +76,6 @@ spring.integration.channels.autoCreate=true spring.integration.channels.maxBroadcastSubscribers=0x7fffffff spring.integration.readOnly.headers= spring.integration.messagingTemplate.throwExceptionOnLateReply=true +spring.integration.endpoints.defaultTimeout=-1 ---- diff --git a/src/reference/antora/modules/ROOT/pages/whats-new.adoc b/src/reference/antora/modules/ROOT/pages/whats-new.adoc index 5862dee65a0..f7d37c348ca 100644 --- a/src/reference/antora/modules/ROOT/pages/whats-new.adoc +++ b/src/reference/antora/modules/ROOT/pages/whats-new.adoc @@ -22,7 +22,7 @@ In general the project has been moved to the latest dependency versions. ==== Debezium Inbound Channel Adapter The Debezium Engine based Change Data Capture (CDC) channel adapter, that allows capturing database change events, converting them into Messages and streaming those to the outbound channels. -See xref:debezium.adoc[Debezium Support] for more information. +See xref:debezium.adoc[Debezium Support] for more information. [[x6.2-general]] === General Changes @@ -33,6 +33,9 @@ See xref:endpoint.adoc#endpoint-pollingconsumer[Polling Consumer] for more infor - Java, Groovy and Kotlin DSLs have now context-specific methods in the `IntegrationFlowDefinition` with a single `Consumer` argument to configure an endpoint and its handler with one builder and readable options. See, for example, `transformWith()`, `splitWith()` in xref:dsl.adoc#java-dsl[ Java DSL Chapter]. +- A new `spring.integration.endpoints.defaultTimeout` global property has been introduced to override the default 30 seconds timeout for all the endpoints in the application. +See xref:configuration/global-properties.adoc[Global Properties] for more information. + [[x6.2-websockets]] === WebSockets Changes