Skip to content

Commit fbe6c34

Browse files
committed
spring-projectsGH-8981: Add UnicastingDispatcher.failoverStrategy option
Fixes: spring-projects#8981 Sometime the simple `boolean failover` on the `MessageChannel` (default `true`) is not enough to be sure that we can dispatch to the next handler or not. Such a decision can be made using `ErrorMessageExceptionTypeRouter`, but that would require an overhaul for the whole integration flow * Introduce a simple `Predicate<Exception> failoverStrategy` into `UnicastingDispatcher` and all its `MessageChannel` implementation consumers to allow to make a decision about next failover according to a thrown exception from the current `MessageHandler` * Expose `failoverStrategy` on the `DirectChannel`, `ExecutorChannel` & `PartitionedChannel`, and add it into respective Java DSL specs * Fix involved tests to rely on the `failoverStrategy` property from now on * Document the new feature
1 parent 809d139 commit fbe6c34

File tree

14 files changed

+230
-130
lines changed

14 files changed

+230
-130
lines changed

spring-integration-core/src/main/java/org/springframework/integration/channel/DirectChannel.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.integration.channel;
1818

19+
import java.util.function.Predicate;
20+
1921
import org.springframework.integration.dispatcher.LoadBalancingStrategy;
2022
import org.springframework.integration.dispatcher.RoundRobinLoadBalancingStrategy;
2123
import org.springframework.integration.dispatcher.UnicastingDispatcher;
@@ -60,12 +62,26 @@ public DirectChannel(@Nullable LoadBalancingStrategy loadBalancingStrategy) {
6062
/**
6163
* Specify whether the channel's dispatcher should have failover enabled.
6264
* By default, it will. Set this value to 'false' to disable it.
65+
* Overrides {@link #setFailoverStrategy(Predicate)} option.
66+
* In other words: or this, or that option has to be set.
6367
* @param failover The failover boolean.
6468
*/
6569
public void setFailover(boolean failover) {
6670
this.dispatcher.setFailover(failover);
6771
}
6872

73+
/**
74+
* Configure a strategy whether the channel's dispatcher should have failover enabled
75+
* for the exception thrown.
76+
* Overrides {@link #setFailover(boolean)} option.
77+
* In other words: or this, or that option has to be set.
78+
* @param failoverStrategy The failover boolean.
79+
* @since 6.3
80+
*/
81+
public void setFailoverStrategy(Predicate<Exception> failoverStrategy) {
82+
this.dispatcher.setFailoverStrategy(failoverStrategy);
83+
}
84+
6985
/**
7086
* Specify the maximum number of subscribers supported by the
7187
* channel's dispatcher.

spring-integration-core/src/main/java/org/springframework/integration/channel/ExecutorChannel.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
1717
package org.springframework.integration.channel;
1818

1919
import java.util.concurrent.Executor;
20+
import java.util.function.Predicate;
2021

2122
import org.springframework.integration.dispatcher.LoadBalancingStrategy;
2223
import org.springframework.integration.dispatcher.RoundRobinLoadBalancingStrategy;
@@ -49,7 +50,7 @@ public class ExecutorChannel extends AbstractExecutorChannel {
4950

5051
private final LoadBalancingStrategy loadBalancingStrategy;
5152

52-
private boolean failover = true;
53+
private Predicate<Exception> failoverStrategy = (exception) -> true;
5354

5455
/**
5556
* Create an ExecutorChannel that delegates to the provided
@@ -88,8 +89,20 @@ public ExecutorChannel(Executor executor, @Nullable LoadBalancingStrategy loadBa
8889
* @param failover The failover boolean.
8990
*/
9091
public void setFailover(boolean failover) {
91-
this.failover = failover;
92-
getDispatcher().setFailover(failover);
92+
setFailoverStrategy((exception) -> failover);
93+
}
94+
95+
/**
96+
* Configure a strategy whether the channel's dispatcher should have failover enabled
97+
* for the exception thrown.
98+
* Overrides {@link #setFailover(boolean)} option.
99+
* In other words: or this, or that option has to be set.
100+
* @param failoverStrategy The failover boolean.
101+
* @since 6.3
102+
*/
103+
public void setFailoverStrategy(Predicate<Exception> failoverStrategy) {
104+
this.failoverStrategy = failoverStrategy;
105+
getDispatcher().setFailoverStrategy(failoverStrategy);
93106
}
94107

95108
@Override
@@ -107,7 +120,7 @@ public final void onInit() {
107120
this.executor = new ErrorHandlingTaskExecutor(this.executor, errorHandler);
108121
}
109122
UnicastingDispatcher unicastingDispatcher = new UnicastingDispatcher(this.executor);
110-
unicastingDispatcher.setFailover(this.failover);
123+
unicastingDispatcher.setFailoverStrategy(this.failoverStrategy);
111124
if (this.maxSubscribers == null) {
112125
this.maxSubscribers = getIntegrationProperties().getChannelsMaxUnicastSubscribers();
113126
}

spring-integration-core/src/main/java/org/springframework/integration/channel/PartitionedChannel.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023 the original author or authors.
2+
* Copyright 2023-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@
1818

1919
import java.util.concurrent.ThreadFactory;
2020
import java.util.function.Function;
21+
import java.util.function.Predicate;
2122

2223
import org.springframework.integration.IntegrationMessageHeaderAccessor;
2324
import org.springframework.integration.dispatcher.LoadBalancingStrategy;
@@ -99,6 +100,18 @@ public void setFailover(boolean failover) {
99100
getDispatcher().setFailover(failover);
100101
}
101102

103+
/**
104+
* Configure a strategy whether the channel's dispatcher should have failover enabled
105+
* for the exception thrown.
106+
* Overrides {@link #setFailover(boolean)} option.
107+
* In other words: or this, or that option has to be set.
108+
* @param failoverStrategy The failover boolean.
109+
* @since 6.3
110+
*/
111+
public void setFailoverStrategy(Predicate<Exception> failoverStrategy) {
112+
getDispatcher().setFailoverStrategy(failoverStrategy);
113+
}
114+
102115
/**
103116
* Provide a {@link LoadBalancingStrategy} for the {@link PartitionedDispatcher}.
104117
* @param loadBalancingStrategy The load balancing strategy implementation.

spring-integration-core/src/main/java/org/springframework/integration/dispatcher/PartitionedDispatcher.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023 the original author or authors.
2+
* Copyright 2023-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -28,6 +28,7 @@
2828
import java.util.concurrent.locks.Lock;
2929
import java.util.concurrent.locks.ReentrantLock;
3030
import java.util.function.Function;
31+
import java.util.function.Predicate;
3132

3233
import org.springframework.integration.util.ErrorHandlingTaskExecutor;
3334
import org.springframework.lang.Nullable;
@@ -67,7 +68,7 @@ public class PartitionedDispatcher extends AbstractDispatcher {
6768

6869
private ThreadFactory threadFactory = new CustomizableThreadFactory("partition-thread-");
6970

70-
private boolean failover = true;
71+
private Predicate<Exception> failoverStrategy = (exception) -> true;
7172

7273
@Nullable
7374
private LoadBalancingStrategy loadBalancingStrategy;
@@ -108,7 +109,20 @@ public void setThreadFactory(ThreadFactory threadFactory) {
108109
* @param failover The failover boolean.
109110
*/
110111
public void setFailover(boolean failover) {
111-
this.failover = failover;
112+
setFailoverStrategy((exception) -> failover);
113+
}
114+
115+
/**
116+
* Configure a strategy whether the channel's dispatcher should have failover enabled
117+
* for the exception thrown.
118+
* Overrides {@link #setFailover(boolean)} option.
119+
* In other words: or this, or that option has to be set.
120+
* @param failoverStrategy The failover boolean.
121+
* @since 6.3
122+
*/
123+
public void setFailoverStrategy(Predicate<Exception> failoverStrategy) {
124+
Assert.notNull(failoverStrategy, "'failoverStrategy' must not be null");
125+
this.failoverStrategy = failoverStrategy;
112126
}
113127

114128
/**
@@ -179,7 +193,7 @@ private UnicastingDispatcher newPartition() {
179193
this.executors.add(executor);
180194
DelegateDispatcher delegateDispatcher =
181195
new DelegateDispatcher(new ErrorHandlingTaskExecutor(executor, this.errorHandler));
182-
delegateDispatcher.setFailover(this.failover);
196+
delegateDispatcher.setFailoverStrategy(this.failoverStrategy);
183197
delegateDispatcher.setLoadBalancingStrategy(this.loadBalancingStrategy);
184198
delegateDispatcher.setMessageHandlingTaskDecorator(this.messageHandlingTaskDecorator);
185199
return delegateDispatcher;

spring-integration-core/src/main/java/org/springframework/integration/dispatcher/UnicastingDispatcher.java

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@
2121
import java.util.List;
2222
import java.util.Set;
2323
import java.util.concurrent.Executor;
24+
import java.util.function.Predicate;
2425

2526
import org.springframework.integration.MessageDispatchingException;
2627
import org.springframework.integration.support.utils.IntegrationUtils;
@@ -58,7 +59,7 @@ public class UnicastingDispatcher extends AbstractDispatcher {
5859

5960
private final Executor executor;
6061

61-
private boolean failover = true;
62+
private Predicate<Exception> failoverStrategy = (exception) -> true;
6263

6364
private LoadBalancingStrategy loadBalancingStrategy;
6465

@@ -77,10 +78,25 @@ public UnicastingDispatcher(@Nullable Executor executor) {
7778
* Specify whether this dispatcher should failover when a single
7879
* {@link MessageHandler} throws an Exception. The default value is
7980
* <code>true</code>.
81+
* Overrides {@link #setFailoverStrategy(Predicate)} option.
82+
* In other words: or this, or that option has to be set.
8083
* @param failover The failover boolean.
8184
*/
8285
public void setFailover(boolean failover) {
83-
this.failover = failover;
86+
setFailoverStrategy((exception) -> failover);
87+
}
88+
89+
/**
90+
* Configure a strategy whether the channel's dispatcher should have failover enabled
91+
* for the exception thrown.
92+
* Overrides {@link #setFailover(boolean)} option.
93+
* In other words: or this, or that option has to be set.
94+
* @param failoverStrategy The failover boolean.
95+
* @since 6.3
96+
*/
97+
public void setFailoverStrategy(Predicate<Exception> failoverStrategy) {
98+
Assert.notNull(failoverStrategy, "'failoverStrategy' must not be null");
99+
this.failoverStrategy = failoverStrategy;
84100
}
85101

86102
/**
@@ -154,10 +170,15 @@ private boolean doDispatch(Message<?> message) {
154170
}
155171
exceptions.add(runtimeException);
156172
boolean isLast = !handlerIterator.hasNext();
157-
if (!isLast && this.failover) {
173+
boolean failover = this.failoverStrategy.test(ex);
174+
175+
if (!isLast && failover) {
158176
logExceptionBeforeFailOver(ex, handler, message);
159177
}
160-
handleExceptions(exceptions, message, isLast);
178+
179+
if (isLast || !failover) {
180+
handleExceptions(exceptions, message);
181+
}
161182
}
162183
}
163184
return success;
@@ -187,22 +208,12 @@ else if (this.logger.isInfoEnabled()) {
187208
}
188209
}
189210

190-
/**
191-
* Handles Exceptions that occur while dispatching. If this dispatcher has
192-
* failover enabled, it will only throw an Exception when the handler list
193-
* is exhausted. The 'isLast' flag will be <em>true</em> if the
194-
* Exception occurred during the final iteration of the MessageHandlers.
195-
* If failover is disabled for this dispatcher, it will re-throw any
196-
* Exception immediately.
197-
*/
198-
private void handleExceptions(List<RuntimeException> allExceptions, Message<?> message, boolean isLast) {
199-
if (isLast || !this.failover) {
200-
if (allExceptions.size() == 1) {
201-
throw allExceptions.get(0);
202-
}
203-
throw new AggregateMessageDeliveryException(message,
204-
"All attempts to deliver Message to MessageHandlers failed.", allExceptions);
211+
private void handleExceptions(List<RuntimeException> allExceptions, Message<?> message) {
212+
if (allExceptions.size() == 1) {
213+
throw allExceptions.get(0);
205214
}
215+
throw new AggregateMessageDeliveryException(message,
216+
"All attempts to deliver Message to MessageHandlers failed.", allExceptions);
206217
}
207218

208219
}

spring-integration-core/src/main/java/org/springframework/integration/dsl/DirectChannelSpec.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2020 the original author or authors.
2+
* Copyright 2016-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -28,8 +28,8 @@ public class DirectChannelSpec extends LoadBalancingChannelSpec<DirectChannelSpe
2828
@Override
2929
protected DirectChannel doGet() {
3030
this.channel = new DirectChannel(this.loadBalancingStrategy);
31-
if (this.failover != null) {
32-
this.channel.setFailover(this.failover);
31+
if (this.failoverStrategy != null) {
32+
this.channel.setFailoverStrategy(this.failoverStrategy);
3333
}
3434
if (this.maxSubscribers != null) {
3535
this.channel.setMaxSubscribers(this.maxSubscribers);

spring-integration-core/src/main/java/org/springframework/integration/dsl/ExecutorChannelSpec.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2020 the original author or authors.
2+
* Copyright 2016-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -36,8 +36,8 @@ protected ExecutorChannelSpec(Executor executor) {
3636
@Override
3737
protected ExecutorChannel doGet() {
3838
this.channel = new ExecutorChannel(this.executor, this.loadBalancingStrategy);
39-
if (this.failover != null) {
40-
this.channel.setFailover(this.failover);
39+
if (this.failoverStrategy != null) {
40+
this.channel.setFailoverStrategy(this.failoverStrategy);
4141
}
4242
if (this.maxSubscribers != null) {
4343
this.channel.setMaxSubscribers(this.maxSubscribers);

spring-integration-core/src/main/java/org/springframework/integration/dsl/LoadBalancingChannelSpec.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.integration.dsl;
1818

19+
import java.util.function.Predicate;
20+
1921
import org.springframework.integration.channel.AbstractMessageChannel;
2022
import org.springframework.integration.dispatcher.LoadBalancingStrategy;
2123
import org.springframework.integration.dispatcher.RoundRobinLoadBalancingStrategy;
@@ -34,7 +36,7 @@ public abstract class LoadBalancingChannelSpec<S extends MessageChannelSpec<S, C
3436

3537
protected LoadBalancingStrategy loadBalancingStrategy = new RoundRobinLoadBalancingStrategy(); // NOSONAR
3638

37-
protected Boolean failover; // NOSONAR
39+
protected Predicate<Exception> failoverStrategy; // NOSONAR
3840

3941
protected Integer maxSubscribers; // NOSONAR
4042

@@ -46,8 +48,20 @@ public S loadBalancer(LoadBalancingStrategy loadBalancingStrategyToSet) {
4648
return _this();
4749
}
4850

49-
public S failover(Boolean failoverToSet) {
50-
this.failover = failoverToSet;
51+
public S failover(boolean failoverToSet) {
52+
return failoverStrategy((exception) -> failoverToSet);
53+
}
54+
55+
/**
56+
* Configure a strategy whether the channel's dispatcher should have failover enabled
57+
* for the exception thrown.
58+
* Overrides {@link #failover(boolean)} option.
59+
* In other words: or this, or that option has to be set.
60+
* @param failoverStrategy The failover boolean.
61+
* @since 6.3
62+
*/
63+
public S failoverStrategy(Predicate<Exception> failoverStrategy) {
64+
this.failoverStrategy = failoverStrategy;
5165
return _this();
5266
}
5367

spring-integration-core/src/main/java/org/springframework/integration/dsl/PartitionedChannelSpec.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023 the original author or authors.
2+
* Copyright 2023-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -63,8 +63,8 @@ protected PartitionedChannel doGet() {
6363
this.channel = new PartitionedChannel(this.partitionCount);
6464
}
6565
this.channel.setLoadBalancingStrategy(this.loadBalancingStrategy);
66-
if (this.failover != null) {
67-
this.channel.setFailover(this.failover);
66+
if (this.failoverStrategy != null) {
67+
this.channel.setFailoverStrategy(this.failoverStrategy);
6868
}
6969
if (this.maxSubscribers != null) {
7070
this.channel.setMaxSubscribers(this.maxSubscribers);

0 commit comments

Comments
 (0)