Skip to content

Commit b576748

Browse files
committed
Improve Delayer docs around messageGroupId
* Mention in the Javadocs of the `DelayHandler`, `DelayerEndpointSpec`, `BaseIntegrationFlowDefinition.delay()`, `GroovyIntegrationFlowDefinition.delay()`, `KotlinIntegrationFlowDefinition.delay()` that `messageGroupId` is required option * Explain in the docs why `messageGroupId` is required and why it cannot rely on a bean name
1 parent 81af20a commit b576748

File tree

6 files changed

+31
-11
lines changed

6 files changed

+31
-11
lines changed

spring-integration-core/src/main/java/org/springframework/integration/dsl/BaseIntegrationFlowDefinition.java

+1
Original file line numberDiff line numberDiff line change
@@ -1208,6 +1208,7 @@ public B delay(String groupId, @Nullable Consumer<DelayerEndpointSpec> endpointC
12081208

12091209
/**
12101210
* Populate a {@link DelayHandler} to the current integration flow position.
1211+
* The {@link DelayerEndpointSpec#messageGroupId(String)} is required option.
12111212
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint options.
12121213
* @return the current {@link BaseIntegrationFlowDefinition}.
12131214
* @since 6.2

spring-integration-core/src/main/java/org/springframework/integration/dsl/DelayerEndpointSpec.java

+9-7
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,14 @@
3636

3737
/**
3838
* A {@link ConsumerEndpointSpec} for a {@link DelayHandler}.
39+
* The {@link #messageGroupId(String)} is required option.
3940
*
4041
* @author Artem Bilan
4142
* @author Gary Russell
4243
*
4344
* @since 5.0
45+
*
46+
* @see DelayHandler
4447
*/
4548
public class DelayerEndpointSpec extends ConsumerEndpointSpec<DelayerEndpointSpec, DelayHandler> {
4649

@@ -96,13 +99,12 @@ public DelayerEndpointSpec delayedAdvice(Advice... advice) {
9699
return _this();
97100
}
98101

99-
public DelayerEndpointSpec delayExpression(Expression delayExpression) {
100-
this.handler.setDelayExpression(delayExpression);
101-
return this;
102+
public DelayerEndpointSpec delayExpression(String delayExpression) {
103+
return delayExpression(PARSER.parseExpression(delayExpression));
102104
}
103105

104-
public DelayerEndpointSpec delayExpression(String delayExpression) {
105-
this.handler.setDelayExpression(PARSER.parseExpression(delayExpression));
106+
public DelayerEndpointSpec delayExpression(Expression delayExpression) {
107+
this.handler.setDelayExpression(delayExpression);
106108
return this;
107109
}
108110

@@ -225,12 +227,12 @@ public DelayerEndpointSpec transactionalRelease(TransactionManager transactionMa
225227
* @return the endpoint spec.
226228
*/
227229
public <P> DelayerEndpointSpec delayFunction(Function<Message<P>, Object> delayFunction) {
228-
this.handler.setDelayExpression(new FunctionExpression<>(delayFunction));
229-
return this;
230+
return delayExpression(new FunctionExpression<>(delayFunction));
230231
}
231232

232233
/**
233234
* Set a group id to manage delayed messages by this handler.
235+
* Required.
234236
* @param messageGroupId the group id for delayed messages.
235237
* @return the endpoint spec.
236238
* @since 6.2

spring-integration-core/src/main/java/org/springframework/integration/handler/DelayHandler.java

+13-2
Original file line numberDiff line numberDiff line change
@@ -83,14 +83,24 @@
8383
* Message can be released as soon as five seconds from the current time). If the value is
8484
* a Date, it will be delayed at least until that Date occurs (i.e. the delay in that case
8585
* is equivalent to {@code headerDate.getTime() - new Date().getTime()}).
86+
* <p>
87+
* Delayed messages are stored in the {@link MessageGroupStore} as a dedicated group.
88+
* If an external persistent store is provided, those delayed messages will be rescheduled
89+
* after application startup.
90+
* The {@link #messageGroupId} is required option and must be unique for each delayer
91+
* configuration to avoid work-stealing from the store and unexpected releases.
92+
* Different instances of the same delayer can point to the same message group in the store.
93+
* The {@link #messageGroupId} cannot rely on a bean name which might be generated.
94+
* After application restart the bean may get a different generated name and its delayed
95+
* messages might be lost from reschedule since its group is not managed
96+
* by the application anymore.
8697
*
8798
* @author Mark Fisher
8899
* @author Artem Bilan
89100
* @author Gary Russell
90101
*
91102
* @since 1.0.3
92103
*/
93-
94104
@ManagedResource
95105
@IntegrationManagedResource
96106
public class DelayHandler extends AbstractReplyProducingMessageHandler implements DelayHandlerManagement,
@@ -132,7 +142,7 @@ public class DelayHandler extends AbstractReplyProducingMessageHandler implement
132142

133143
/**
134144
* Construct an instance with default options.
135-
* The {@link #messageGroupId}must then be provided via the setter.
145+
* The {@link #messageGroupId} must then be provided via the setter.
136146
* @since 6.2
137147
*/
138148
public DelayHandler() {
@@ -165,6 +175,7 @@ public DelayHandler(String messageGroupId, TaskScheduler taskScheduler) {
165175

166176
/**
167177
* Set a group id to manage delayed messages by this handler.
178+
* Required.
168179
* @param messageGroupId the group id for delayed messages.
169180
* @since 6.2
170181
*/

spring-integration-core/src/main/kotlin/org/springframework/integration/dsl/KotlinIntegrationFlowDefinition.kt

+1
Original file line numberDiff line numberDiff line change
@@ -573,6 +573,7 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ
573573

574574
/**
575575
* Populate a [DelayHandler] to the current integration flow position.
576+
* The [DelayerEndpointSpec#messageGroupId(String)] is required option.
576577
* @since 6.2
577578
*/
578579
fun delay(endpointConfigurer: DelayerEndpointSpec.() -> Unit) {

spring-integration-groovy/src/main/groovy/org/springframework/integration/groovy/dsl/GroovyIntegrationFlowDefinition.groovy

+2-1
Original file line numberDiff line numberDiff line change
@@ -633,9 +633,10 @@ class GroovyIntegrationFlowDefinition {
633633

634634
/**
635635
* Populate a {@link org.springframework.integration.handler.DelayHandler} to the current integration flow position.
636+
* The {@link DelayerEndpointSpec#messageGroupId(String)} is required option.
636637
* @param endpointConfigurer the {@link Consumer} to provide integration endpoint options.
637-
* @see org.springframework.integration.dsl.DelayerEndpointSpec
638638
* @since 6.2
639+
* @see DelayerEndpointSpec
639640
*/
640641
GroovyIntegrationFlowDefinition delay(
641642
@DelegatesTo(value = DelayerEndpointSpec, strategy = Closure.DELEGATE_FIRST)

src/reference/asciidoc/delayer.adoc

+5-1
Original file line numberDiff line numberDiff line change
@@ -149,12 +149,16 @@ See <<delayer-release-failures>>.
149149
==== Delayer and a Message Store
150150

151151
The `DelayHandler` persists delayed messages into the message group in the provided `MessageStore`.
152-
(The 'groupId' is based on the required 'id' attribute of the `<delayer>` element.)
152+
(The 'groupId' is based on the required 'id' attribute of the `<delayer>` element.
153+
See also `DelayHandler.setMessageGroupId(String)`.)
153154
A delayed message is removed from the `MessageStore` by the scheduled task immediately before the `DelayHandler` sends the message to the `output-channel`.
154155
If the provided `MessageStore` is persistent (such as `JdbcMessageStore`), it provides the ability to not lose messages on the application shutdown.
155156
After application startup, the `DelayHandler` reads messages from its message group in the `MessageStore` and reschedules them with a delay based on the original arrival time of the message (if the delay is numeric).
156157
For messages where the delay header was a `Date`, that `Date` is used when rescheduling.
157158
If a delayed message remains in the `MessageStore` more than its 'delay', it is sent immediately after startup.
159+
The `messageGroupId` is required and cannot rely on a `DelayHandler` bean name which can be generated.
160+
That way, after application restart, a `DelayHandler` may get a new generated bean name.
161+
Therefore, delayed messages might be lost from rescheduling since their group is not managed by the application anymore.
158162

159163
The `<delayer>` can be enriched with either of two mutually exclusive elements: `<transactional>` and `<advice-chain>`.
160164
The `List` of these AOP advices is applied to the proxied internal `DelayHandler.ReleaseMessageHandler`, which has the responsibility to release the message, after the delay, on a `Thread` of the scheduled task.

0 commit comments

Comments
 (0)