-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Improve Delayer DSL #8645
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve Delayer DSL #8645
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
/* | ||
* Copyright 2002-2022 the original author or authors. | ||
* Copyright 2002-2023 the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
|
@@ -28,6 +28,8 @@ | |
import java.util.concurrent.ConcurrentMap; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
import java.util.concurrent.locks.Lock; | ||
import java.util.concurrent.locks.ReentrantLock; | ||
import java.util.stream.Stream; | ||
|
||
import org.aopalliance.aop.Advice; | ||
|
@@ -98,10 +100,12 @@ public class DelayHandler extends AbstractReplyProducingMessageHandler implement | |
|
||
public static final long DEFAULT_RETRY_DELAY = 1_000; | ||
|
||
private final String messageGroupId; | ||
|
||
private final ConcurrentMap<String, AtomicInteger> deliveries = new ConcurrentHashMap<>(); | ||
|
||
private final Lock removeReleasedMessageLock = new ReentrantLock(); | ||
|
||
private String messageGroupId; | ||
|
||
private long defaultDelay; | ||
|
||
private Expression delayExpression; | ||
|
@@ -126,6 +130,14 @@ public class DelayHandler extends AbstractReplyProducingMessageHandler implement | |
|
||
private long retryDelay = DEFAULT_RETRY_DELAY; | ||
|
||
/** | ||
* Construct an instance with default options. | ||
* The {@link #messageGroupId} must be provided then via setter. | ||
* @since 6.2 | ||
*/ | ||
public DelayHandler() { | ||
} | ||
|
||
/** | ||
* Create a DelayHandler with the given 'messageGroupId' that is used as 'key' for | ||
* {@link MessageGroup} to store delayed Messages in the {@link MessageGroupStore}. | ||
|
@@ -151,6 +163,15 @@ public DelayHandler(String messageGroupId, TaskScheduler taskScheduler) { | |
setTaskScheduler(taskScheduler); | ||
} | ||
|
||
/** | ||
* Set a group id to manage delayed messages by this handler. | ||
* @param messageGroupId the group id for delayed messages. | ||
* @since 6.2 | ||
*/ | ||
public void setMessageGroupId(String messageGroupId) { | ||
this.messageGroupId = messageGroupId; | ||
} | ||
|
||
/** | ||
* Set the default delay in milliseconds. If no {@code delayExpression} property has | ||
* been provided, the default delay will be applied to all Messages. If a delay should | ||
|
@@ -187,10 +208,10 @@ public void setDelayExpressionString(String delayExpression) { | |
|
||
/** | ||
* Specify whether {@code Exceptions} thrown by {@link #delayExpression} evaluation | ||
* should be ignored (only logged). In this case case the delayer will fall back to | ||
* the to the {@link #defaultDelay}. If this property is specified as {@code false}, | ||
* should be ignored (only logged). In this case the delayer will fall back to | ||
* the {@link #defaultDelay}. If this property is specified as {@code false}, | ||
* any {@link #delayExpression} evaluation {@code Exception} will be thrown to the | ||
* caller without falling back to the to the {@link #defaultDelay}. Default is | ||
* caller without falling back to the {@link #defaultDelay}. Default is | ||
* {@code true}. | ||
* @param ignoreExpressionFailures true if expression evaluation failures should be | ||
* ignored. | ||
|
@@ -297,6 +318,8 @@ public IntegrationPatternType getIntegrationPatternType() { | |
|
||
@Override | ||
protected void doInit() { | ||
Assert.notNull(this.messageGroupId, "The 'messageGroupId' must be provided"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Either "A 'messageGroupId'..." or "The 'messageGroupId' property..." |
||
|
||
if (this.messageStore == null) { | ||
this.messageStore = new SimpleMessageStore(); | ||
} | ||
|
@@ -552,7 +575,8 @@ private void doReleaseMessage(Message<?> message) { | |
|
||
private boolean removeDelayedMessageFromMessageStore(Message<?> message) { | ||
if (this.messageStore instanceof SimpleMessageStore) { | ||
synchronized (this.messageGroupId) { | ||
this.removeReleasedMessageLock.lock(); | ||
try { | ||
Collection<Message<?>> messages = this.messageStore.getMessageGroup(this.messageGroupId).getMessages(); | ||
if (messages.contains(message)) { | ||
this.messageStore.removeMessagesFromGroup(this.messageGroupId, message); | ||
|
@@ -562,6 +586,9 @@ private boolean removeDelayedMessageFromMessageStore(Message<?> message) { | |
return false; | ||
} | ||
} | ||
finally { | ||
this.removeReleasedMessageLock.unlock(); | ||
} | ||
} | ||
else { | ||
return ((MessageStore) this.messageStore).removeMessage(message.getHeaders().getId()) != null; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -561,10 +561,23 @@ class KotlinIntegrationFlowDefinition(@PublishedApi internal val delegate: Integ | |
/** | ||
* Populate a [DelayHandler] to the current integration flow position. | ||
*/ | ||
@Deprecated("since 6.2", | ||
ReplaceWith("""delay { | ||
messageGroupId(groupId) | ||
}""")) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Strange indentation. |
||
@Suppress("DEPRECATION") | ||
fun delay(groupId: String, endpointConfigurer: DelayerEndpointSpec.() -> Unit = {}) { | ||
this.delegate.delay(groupId, endpointConfigurer) | ||
} | ||
|
||
/** | ||
* Populate a [DelayHandler] to the current integration flow position. | ||
* @since 6.2 | ||
*/ | ||
fun delay(endpointConfigurer: DelayerEndpointSpec.() -> Unit) { | ||
this.delegate.delay(endpointConfigurer) | ||
} | ||
|
||
/** | ||
* Populate a [org.springframework.integration.transformer.ContentEnricher] | ||
* to the current integration flow position | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,7 +35,10 @@ flowLambda() { | |
wireTap integrationFlow { | ||
channel { queue 'wireTapChannel' } | ||
} | ||
delay 'delayGroup', { defaultDelay 100 } | ||
delay { | ||
messageGroupId 'delayGroup' | ||
defaultDelay 100 | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix indentation. |
||
transform String, { it.toUpperCase() } | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.