Skip to content

GH-8704: Add global property for defaultTimeout #8706

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,8 +18,7 @@

import java.lang.reflect.Field;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import org.springframework.amqp.core.Address;
Expand All @@ -40,11 +39,11 @@
import org.springframework.integration.test.util.TestUtils;
import org.springframework.messaging.MessageChannel;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import org.springframework.util.ReflectionUtils;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.mockito.ArgumentMatchers.isNull;

/**
Expand All @@ -55,8 +54,7 @@
*
* @since 2.1
*/
@ContextConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
@SpringJUnitConfig
@DirtiesContext
public class AmqpInboundGatewayParserTests {

Expand All @@ -66,16 +64,16 @@ public class AmqpInboundGatewayParserTests {
@Test
public void customMessageConverter() {
Object gateway = context.getBean("gateway");
MessageConverter gatewayConverter = TestUtils.getPropertyValue(gateway, "amqpMessageConverter", MessageConverter.class);
MessageConverter templateConverter = TestUtils.getPropertyValue(gateway, "amqpTemplate.messageConverter", MessageConverter.class);
MessageConverter gatewayConverter =
TestUtils.getPropertyValue(gateway, "amqpMessageConverter", MessageConverter.class);
MessageConverter templateConverter =
TestUtils.getPropertyValue(gateway, "amqpTemplate.messageConverter", MessageConverter.class);
TestConverter testConverter = context.getBean("testConverter", TestConverter.class);
assertThat(gatewayConverter).isSameAs(testConverter);
assertThat(templateConverter).isSameAs(testConverter);
assertThat(TestUtils.getPropertyValue(gateway, "autoStartup")).isEqualTo(Boolean.TRUE);
assertThat(TestUtils.getPropertyValue(gateway, "phase")).isEqualTo(0);
assertThat(TestUtils.getPropertyValue(gateway, "replyTimeout", Long.class)).isEqualTo(Long.valueOf(1234L));
assertThat(TestUtils.getPropertyValue(gateway, "messagingTemplate.receiveTimeout", Long.class))
.isEqualTo(Long.valueOf(1234L));
assertThat(TestUtils.getPropertyValue(gateway, "messagingTemplate.receiveTimeout")).isEqualTo(1234L);
assertThat(TestUtils.getPropertyValue(gateway, "messageListenerContainer.missingQueuesFatal", Boolean.class))
.isTrue();
}
Expand Down Expand Up @@ -145,14 +143,12 @@ public void verifyUsageWithHeaderMapper() throws Exception {

@Test
public void testInt2971HeaderMapperAndMappedHeadersExclusivity() {
try {
new ClassPathXmlApplicationContext("AmqpInboundGatewayParserTests-headerMapper-fail-context.xml",
this.getClass()).close();
}
catch (BeanDefinitionParsingException e) {
assertThat(e.getMessage().startsWith("Configuration problem: The 'header-mapper' attribute " +
"is mutually exclusive with 'mapped-request-headers' or 'mapped-reply-headers'")).isTrue();
}
assertThatExceptionOfType(BeanDefinitionParsingException.class)
.isThrownBy(() ->
new ClassPathXmlApplicationContext("AmqpInboundGatewayParserTests-headerMapper-fail-context.xml",
getClass()))
.withMessageStartingWith("Configuration problem: The 'header-mapper' attribute " +
"is mutually exclusive with 'mapped-request-headers' or 'mapped-reply-headers'");
}

private static class TestConverter extends SimpleMessageConverter {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2022 the original author or authors.
* Copyright 2014-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -37,6 +37,7 @@
* <li> {@code spring.integration.endpoints.noAutoStartup=}
* <li> {@code spring.integration.channels.error.requireSubscribers=true}
* <li> {@code spring.integration.channels.error.ignoreFailures=true}
* <li> {@code spring.integration.endpoints.defaultTimeout=30000}
* </ul>
*
* @author Artem Bilan
Expand Down Expand Up @@ -112,6 +113,12 @@ public final class IntegrationProperties {
*/
public static final String ENDPOINTS_NO_AUTO_STARTUP = INTEGRATION_PROPERTIES_PREFIX + "endpoints.noAutoStartup";

/**
* Specifies the default timeout for blocking operations like send and receive messages.
* @since 6.2
*/
public static final String ENDPOINTS_DEFAULT_TIMEOUT = INTEGRATION_PROPERTIES_PREFIX + "endpoints.defaultTimeout";

private static final Properties DEFAULTS;

private boolean channelsAutoCreate = true;
Expand All @@ -132,6 +139,8 @@ public final class IntegrationProperties {

private String[] noAutoStartupEndpoints = {};

private long endpointsDefaultTimeout = IntegrationContextUtils.DEFAULT_TIMEOUT;

private volatile Properties properties;

static {
Expand Down Expand Up @@ -293,6 +302,23 @@ public String[] getNoAutoStartupEndpoints() {
return Arrays.copyOf(this.noAutoStartupEndpoints, this.noAutoStartupEndpoints.length);
}

/**
* Return the value of {@link #ENDPOINTS_DEFAULT_TIMEOUT} option.
* @return the value of {@link #ENDPOINTS_DEFAULT_TIMEOUT} option.
* @since 6.2
*/
public long getEndpointsDefaultTimeout() {
return this.endpointsDefaultTimeout;
}

/**
* Configure a value for {@link #ENDPOINTS_DEFAULT_TIMEOUT} option.
* @param endpointsDefaultTimeout the value for {@link #ENDPOINTS_DEFAULT_TIMEOUT} option.
*/
public void setEndpointsDefaultTimeout(long endpointsDefaultTimeout) {
this.endpointsDefaultTimeout = endpointsDefaultTimeout;
}

/**
* Represent the current instance as a {@link Properties}.
* @return the {@link Properties} representation.
Expand All @@ -312,6 +338,7 @@ public Properties toProperties() {
props.setProperty(READ_ONLY_HEADERS, StringUtils.arrayToCommaDelimitedString(this.readOnlyHeaders));
props.setProperty(ENDPOINTS_NO_AUTO_STARTUP,
StringUtils.arrayToCommaDelimitedString(this.noAutoStartupEndpoints));
props.setProperty(ENDPOINTS_DEFAULT_TIMEOUT, "" + this.endpointsDefaultTimeout);

this.properties = props;
}
Expand Down Expand Up @@ -348,7 +375,9 @@ public static IntegrationProperties parse(Properties properties) {
StringUtils.commaDelimitedListToStringArray(value)))
.acceptIfHasText(properties.getProperty(ENDPOINTS_NO_AUTO_STARTUP),
(value) -> integrationProperties.setNoAutoStartupEndpoints(
StringUtils.commaDelimitedListToStringArray(value)));
StringUtils.commaDelimitedListToStringArray(value)))
.acceptIfHasText(properties.getProperty(ENDPOINTS_DEFAULT_TIMEOUT),
(value) -> integrationProperties.setEndpointsDefaultTimeout(Long.parseLong(value)));
return integrationProperties;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.springframework.integration.IntegrationPatternType;
import org.springframework.integration.MessageTimeoutException;
import org.springframework.integration.channel.ReactiveStreamsSubscribableChannel;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.endpoint.AbstractEndpoint;
import org.springframework.integration.endpoint.EventDrivenConsumer;
Expand Down Expand Up @@ -124,7 +123,9 @@ public abstract class MessagingGatewaySupport extends AbstractEndpoint

private String errorChannelName;

private long replyTimeout = IntegrationContextUtils.DEFAULT_TIMEOUT;
private boolean requestTimeoutSet;

private boolean replyTimeoutSet;

private InboundMessageMapper<Object> requestMapper = new DefaultRequestMapper();

Expand Down Expand Up @@ -167,8 +168,6 @@ public MessagingGatewaySupport() {
public MessagingGatewaySupport(boolean errorOnTimeout) {
ConvertingMessagingTemplate template = new ConvertingMessagingTemplate();
template.setMessageConverter(this.messageConverter);
template.setSendTimeout(IntegrationContextUtils.DEFAULT_TIMEOUT);
template.setReceiveTimeout(this.replyTimeout);
this.messagingTemplate = template;
this.errorOnTimeout = errorOnTimeout;
}
Expand Down Expand Up @@ -252,6 +251,7 @@ public void setErrorChannelName(String errorChannelName) {
*/
public void setRequestTimeout(long requestTimeout) {
this.messagingTemplate.setSendTimeout(requestTimeout);
this.requestTimeoutSet = true;
}

/**
Expand All @@ -260,8 +260,8 @@ public void setRequestTimeout(long requestTimeout) {
* @param replyTimeout the timeout value in milliseconds
*/
public void setReplyTimeout(long replyTimeout) {
this.replyTimeout = replyTimeout;
this.messagingTemplate.setReceiveTimeout(replyTimeout);
this.replyTimeoutSet = true;
}

/**
Expand Down Expand Up @@ -406,6 +406,13 @@ protected void onInit() {
}
this.messageConverter.setBeanFactory(beanFactory);
}
long endpointsDefaultTimeout = getIntegrationProperties().getEndpointsDefaultTimeout();
if (!this.requestTimeoutSet) {
this.messagingTemplate.setSendTimeout(endpointsDefaultTimeout);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this intentional? Previously, the send timeout still defaulted to -1. Based on the docs, I assume yes. It makes sense to apply it, but I wonder if two properties are needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was like this in the ctor:

template.setSendTimeout(IntegrationContextUtils.DEFAULT_TIMEOUT);

Yes, it was -1 before 6.1.

Not sure what you are asking about two props, though...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just wondering if we need a default send timeout as well as a default reply timeout property; since they are two different things; the send timeout is, for example, if a bounded QueueChannel is full whereas the reply timeout is for a request/reply.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean, but from end-user waiting perspective it looks the same, therefore I believe just a single default property is enough.
You can always override per endpoint and per property if there is a different requirements for send and receive.

}
if (!this.replyTimeoutSet) {
this.messagingTemplate.setReceiveTimeout(endpointsDefaultTimeout);
}
this.initialized = true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,15 @@ public abstract class AbstractMessageProducingHandler extends AbstractMessageHan

private boolean noHeadersPropagation;

{
this.messagingTemplate.setSendTimeout(IntegrationContextUtils.DEFAULT_TIMEOUT);
}
private boolean sendTimeoutSet;

/**
* Set the timeout for sending reply Messages.
* @param sendTimeout The send timeout.
*/
public void setSendTimeout(long sendTimeout) {
this.messagingTemplate.setSendTimeout(sendTimeout);
this.sendTimeoutSet = true;
}

@Override
Expand Down Expand Up @@ -189,7 +188,7 @@ protected final void updateNotPropagatedHeaders(String[] headers, boolean merge)
@Override
public Collection<String> getNotPropagatedHeaders() {
return this.notPropagatedHeaders != null
? Collections.unmodifiableSet(new HashSet<>(Arrays.asList(this.notPropagatedHeaders)))
? Set.of(this.notPropagatedHeaders)
: Collections.emptyList();
}

Expand Down Expand Up @@ -217,6 +216,9 @@ protected void onInit() {
}
this.messagingTemplate.setDestinationResolver(getChannelResolver());
setAsyncIfCan();
if (!this.sendTimeoutSet) {
this.messagingTemplate.setSendTimeout(getIntegrationProperties().getEndpointsDefaultTimeout());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

}

private void setAsyncIfCan() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.springframework.core.convert.ConversionService;
import org.springframework.core.convert.support.DefaultConversionService;
import org.springframework.integration.IntegrationPatternType;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.handler.AbstractMessageHandler;
import org.springframework.integration.support.management.IntegrationManagedResource;
Expand Down Expand Up @@ -63,9 +62,7 @@ public abstract class AbstractMessageRouter extends AbstractMessageHandler imple

private volatile boolean applySequence;

{
this.messagingTemplate.setSendTimeout(IntegrationContextUtils.DEFAULT_TIMEOUT);
}
private boolean sendTimeoutSet;

/**
* Set the default channel where Messages should be sent if channel resolution
Expand Down Expand Up @@ -115,10 +112,11 @@ public void setDefaultOutputChannelName(String defaultOutputChannelName) {
*/
public void setSendTimeout(long timeout) {
this.messagingTemplate.setSendTimeout(timeout);
this.sendTimeoutSet = true;
}

/**
* Specify whether send failures for one or more of the recipients should be ignored. By default this is
* Specify whether send failures for one or more of the recipients should be ignored. By default, this is
* <code>false</code> meaning that an Exception will be thrown whenever a send fails. To override this and suppress
* Exceptions, set the value to <code>true</code>.
* @param ignoreSendFailures true to ignore send failures.
Expand Down Expand Up @@ -174,6 +172,10 @@ protected void onInit() {
if (beanFactory != null) {
this.messagingTemplate.setBeanFactory(beanFactory);
}

if (!this.sendTimeoutSet) {
this.messagingTemplate.setSendTimeout(getIntegrationProperties().getEndpointsDefaultTimeout());
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public class ScatterGatherHandler extends AbstractReplyProducingMessageHandler i

private String errorChannelName = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME;

private long gatherTimeout = IntegrationContextUtils.DEFAULT_TIMEOUT;
private Long gatherTimeout;

private AbstractEndpoint gatherEndpoint;

Expand Down Expand Up @@ -119,6 +119,10 @@ public IntegrationPatternType getIntegrationPatternType() {

@Override
protected void doInit() {
if (this.gatherTimeout == null) {
this.gatherTimeout = getIntegrationProperties().getEndpointsDefaultTimeout();
}

BeanFactory beanFactory = getBeanFactory();
if (this.gatherChannel == null) {
this.gatherChannel =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ spring.integration.messagingTemplate.throwExceptionOnLateReply=false
# Defaults to MessageHeaders.ID and MessageHeaders.TIMESTAMP
spring.integration.readOnly.headers=
spring.integration.endpoints.noAutoStartup=
spring.integration.endpoints.defaultTimeout=30000
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,10 @@ public void initializeSubject() {
outputChannel = mock(MessageChannel.class);
handler = new AggregatingMessageHandler(processor, store, correlationStrategy, ReleaseStrategy);
handler.setOutputChannel(outputChannel);
handler.setBeanFactory(mock());
handler.afterPropertiesSet();
}


@Test
public void bufferCompletesNormally() {
String correlationKey = "key";
Expand All @@ -95,7 +96,7 @@ public void bufferCompletesNormally() {
}

@Test
public void bufferCompletesWithException() throws Exception {
public void bufferCompletesWithException() {

doAnswer(new ThrowsException(new RuntimeException("Planned test exception")))
.when(processor).processMessageGroup(isA(SimpleMessageGroup.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ void testDefaultResequencerProperties() {
ResequencingMessageHandler resequencer = TestUtils.getPropertyValue(endpoint, "handler",
ResequencingMessageHandler.class);
assertThat(getPropertyValue(resequencer, "outputChannel")).isNull();
assertThat(getPropertyValue(resequencer, "messagingTemplate.sendTimeout")).isEqualTo(30000L);
assertThat(getPropertyValue(resequencer, "messagingTemplate.sendTimeout")).isEqualTo(45000L);
assertThat(getPropertyValue(resequencer, "sendPartialResultOnExpiry"))
.as("The ResequencerEndpoint is not configured with the appropriate 'send partial results on " +
"timeout'" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void testAnnotationWithDefaultSettings() {
assertThat(getPropertyValue(aggregator, "releaseStrategy") instanceof SimpleSequenceSizeReleaseStrategy)
.isTrue();
assertThat(getPropertyValue(aggregator, "outputChannel")).isNull();
assertThat(getPropertyValue(aggregator, "messagingTemplate.sendTimeout")).isEqualTo(30000L);
assertThat(getPropertyValue(aggregator, "messagingTemplate.sendTimeout")).isEqualTo(45000L);
assertThat(getPropertyValue(aggregator, "sendPartialResultOnExpiry")).isEqualTo(false);
context.close();
}
Expand All @@ -72,7 +72,7 @@ public void testAnnotationWithCustomSettings() {
}

@Test
public void testAnnotationWithCustomReleaseStrategy() throws Exception {
public void testAnnotationWithCustomReleaseStrategy() {
ConfigurableApplicationContext context = new ClassPathXmlApplicationContext(
new String[] {"classpath:/org/springframework/integration/config/annotation/testAnnotatedAggregator.xml"});
final String endpointName = "endpointWithDefaultAnnotationAndCustomReleaseStrategy";
Expand All @@ -90,7 +90,7 @@ public void testAnnotationWithCustomReleaseStrategy() throws Exception {
}

@Test
public void testAnnotationWithCustomCorrelationStrategy() throws Exception {
public void testAnnotationWithCustomCorrelationStrategy() {
ConfigurableApplicationContext context = new ClassPathXmlApplicationContext(
new String[] {"classpath:/org/springframework/integration/config/annotation/testAnnotatedAggregator.xml"});
final String endpointName = "endpointWithCorrelationStrategy";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class HeaderEnricherParserTests {
void sendTimeoutDefault() {
Object endpoint = context.getBean("headerEnricherWithDefaults");
long sendTimeout = TestUtils.getPropertyValue(endpoint, "handler.messagingTemplate.sendTimeout", Long.class);
assertThat(sendTimeout).isEqualTo(30000L);
assertThat(sendTimeout).isEqualTo(45000L);
}

@Test
Expand Down
Loading