Skip to content

Commit bfc6931

Browse files
adel-haidarartembilan
authored andcommitted
GH-3869: Add ContextHolderRequestHandlerAdvice
Fixes #3869 * Move `ContextHolderRequestHandlerAdvice` to the `core` module for more general purposes * Add `ContextHolderRequestHandlerAdviceTests` * Rework `DelegatingSessionFactoryTests` to rely on the `ContextHolderRequestHandlerAdvice`. This allows us to remove unnecessary XML configuration for this test class * Document the feature
1 parent b0093ef commit bfc6931

File tree

8 files changed

+201
-53
lines changed

8 files changed

+201
-53
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.handler.advice;
18+
19+
import java.util.function.Consumer;
20+
import java.util.function.Function;
21+
22+
import org.springframework.messaging.Message;
23+
import org.springframework.util.Assert;
24+
25+
/**
26+
* An {@link AbstractRequestHandlerAdvice} implementation to store and reset
27+
* a value into/from some context (e.g. {@link ThreadLocal}) against a request message.
28+
* The context is populated before {@code callback.execute()} and reset after.
29+
*
30+
* @author Adel Haidar
31+
* @author Artem Bilan
32+
*
33+
* @since 6.1
34+
*/
35+
public class ContextHolderRequestHandlerAdvice extends AbstractRequestHandlerAdvice {
36+
37+
public final Function<Message<?>, Object> valueProvider;
38+
39+
public final Consumer<Object> contextSetHook;
40+
41+
public final Runnable contextClearHook;
42+
43+
private boolean enableContextPropagation;
44+
45+
/**
46+
* Construct an instance based on the provided hooks.
47+
* @param valueProvider The key provider function.
48+
* @param contextSetHook The context set hook consumer.
49+
* @param contextClearHook The context clear hook consumer.
50+
*/
51+
public ContextHolderRequestHandlerAdvice(Function<Message<?>, Object> valueProvider,
52+
Consumer<Object> contextSetHook, Runnable contextClearHook) {
53+
54+
Assert.notNull(valueProvider, "'valueProvider' must not be null");
55+
Assert.notNull(contextSetHook, "'contextSetHook' must not be null");
56+
Assert.notNull(contextClearHook, "'contextClearHook' must not be null");
57+
this.valueProvider = valueProvider;
58+
this.contextSetHook = contextSetHook;
59+
this.contextClearHook = contextClearHook;
60+
}
61+
62+
@Override
63+
protected Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) {
64+
Object value = this.valueProvider.apply(message);
65+
logger.trace(() -> "Setting context value to: " + value + " from message: " + message);
66+
try {
67+
this.contextSetHook.accept(value);
68+
return callback.execute();
69+
}
70+
finally {
71+
this.contextClearHook.run();
72+
}
73+
}
74+
75+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.handler.advice;
18+
19+
import java.util.concurrent.atomic.AtomicReference;
20+
21+
import org.junit.jupiter.api.Test;
22+
23+
import org.springframework.aop.framework.ProxyFactoryBean;
24+
import org.springframework.messaging.MessageHandler;
25+
import org.springframework.messaging.support.GenericMessage;
26+
27+
import static org.assertj.core.api.Assertions.assertThat;
28+
29+
/**
30+
* @author Artem Bilan
31+
*
32+
* @since 6.1
33+
*/
34+
public class ContextHolderRequestHandlerAdviceTests {
35+
36+
@Test
37+
void contextHolderRequestHandlerAdviceInAction() {
38+
AtomicReference<Object> context = new AtomicReference<>();
39+
40+
AtomicReference<Object> valueFromHandler = new AtomicReference<>();
41+
MessageHandler testHandler = message -> valueFromHandler.set(context.get());
42+
43+
String testContextValue = "test data";
44+
45+
ContextHolderRequestHandlerAdvice contextHolderRequestHandlerAdvice =
46+
new ContextHolderRequestHandlerAdvice(m -> testContextValue, context::set, () -> context.set(null));
47+
48+
ProxyFactoryBean fb = new ProxyFactoryBean();
49+
fb.setTarget(testHandler);
50+
fb.addAdvice(contextHolderRequestHandlerAdvice);
51+
testHandler = (MessageHandler) fb.getObject();
52+
53+
testHandler.handleMessage(new GenericMessage<>(""));
54+
55+
assertThat(valueFromHandler.get()).isEqualTo(testContextValue);
56+
assertThat(context.get()).isNull();
57+
}
58+
59+
}

spring-integration-file/src/main/java/org/springframework/integration/file/remote/session/DelegatingSessionFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2021 the original author or authors.
2+
* Copyright 2015-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -28,6 +28,7 @@
2828
* @param <F> the target system file type.
2929
*
3030
* @author Gary Russell
31+
*
3132
* @since 4.2
3233
*
3334
*/
@@ -52,7 +53,7 @@ public DelegatingSessionFactory(Map<Object, SessionFactory<F>> factories, Sessio
5253
* @param factoryLocator the factory.
5354
*/
5455
public DelegatingSessionFactory(SessionFactoryLocator<F> factoryLocator) {
55-
Assert.notNull(factoryLocator, "'factoryFactory' cannot be null");
56+
Assert.notNull(factoryLocator, "'factoryLocator' cannot be null");
5657
this.factoryLocator = factoryLocator;
5758
}
5859

spring-integration-file/src/test/java/org/springframework/integration/file/remote/session/DelegatingSessionFactoryTests.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,12 @@
2626
import org.springframework.beans.factory.annotation.Autowired;
2727
import org.springframework.context.annotation.Bean;
2828
import org.springframework.context.annotation.Configuration;
29-
import org.springframework.context.annotation.ImportResource;
3029
import org.springframework.integration.annotation.ServiceActivator;
30+
import org.springframework.integration.channel.QueueChannel;
3131
import org.springframework.integration.config.EnableIntegration;
3232
import org.springframework.integration.file.remote.AbstractFileInfo;
3333
import org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway;
34+
import org.springframework.integration.handler.advice.ContextHolderRequestHandlerAdvice;
3435
import org.springframework.integration.test.util.TestUtils;
3536
import org.springframework.messaging.Message;
3637
import org.springframework.messaging.MessageChannel;
@@ -102,8 +103,6 @@ public void testFlow() throws Exception {
102103
}
103104

104105
@Configuration
105-
@ImportResource(
106-
"classpath:/org/springframework/integration/file/remote/session/delegating-session-factory-context.xml")
107106
@EnableIntegration
108107
public static class Config {
109108

@@ -132,11 +131,21 @@ public SessionFactoryLocator<String> sessionFactoryLocator() {
132131
return new DefaultSessionFactoryLocator<>(factories, bar);
133132
}
134133

135-
@ServiceActivator(inputChannel = "c1")
134+
@Bean
135+
QueueChannel out() {
136+
return new QueueChannel();
137+
}
138+
139+
@Bean
140+
ContextHolderRequestHandlerAdvice contextHolderRequestHandlerAdvice(DelegatingSessionFactory<String> dsf) {
141+
return new ContextHolderRequestHandlerAdvice(Message::getPayload, dsf::setThreadKey, dsf::clearThreadKey);
142+
}
143+
144+
@ServiceActivator(inputChannel = "in", adviceChain = "contextHolderRequestHandlerAdvice")
136145
@Bean
137146
MessageHandler handler() {
138147
AbstractRemoteFileOutboundGateway<String> gateway =
139-
new AbstractRemoteFileOutboundGateway<String>(dsf(), "ls", "payload") {
148+
new AbstractRemoteFileOutboundGateway<>(dsf(), "ls", "payload") {
140149

141150
@Override
142151
protected boolean isDirectory(String file) {
@@ -172,8 +181,9 @@ protected List<AbstractFileInfo<String>> asFileInfoList(Collection<String> files
172181
protected String enhanceNameWithSubDirectory(String file, String directory) {
173182
return null;
174183
}
184+
175185
};
176-
gateway.setOutputChannelName("c2");
186+
gateway.setOutputChannelName("out");
177187
gateway.setOptions("-1");
178188
return gateway;
179189
}

spring-integration-file/src/test/java/org/springframework/integration/file/remote/session/delegating-session-factory-context.xml

Lines changed: 0 additions & 20 deletions
This file was deleted.

src/reference/asciidoc/ftp.adoc

Lines changed: 2 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -231,33 +231,10 @@ private static final class SharedSSLFTPSClient extends FTPSClient {
231231

232232
Version 4.2 introduced the `DelegatingSessionFactory`, which allows the selection of the actual session factory at runtime.
233233
Prior to invoking the FTP endpoint, call `setThreadKey()` on the factory to associate a key with the current thread.
234-
That key is then used to lookup the actual session factory to be used.
234+
That key is then used to look up the actual session factory to be used.
235235
You can clear the key by calling `clearThreadKey()` after use.
236236

237-
We added convenience methods so that you can easily do use a delegating session factory from a message flow.
238-
239-
The following example shows how to declare a delegating session factory:
240-
241-
====
242-
[source, xml]
243-
----
244-
<bean id="dsf" class="org.springframework.integration.file.remote.session.DelegatingSessionFactory">
245-
<constructor-arg>
246-
<bean class="o.s.i.file.remote.session.DefaultSessionFactoryLocator">
247-
<!-- delegate factories here -->
248-
</bean>
249-
</constructor-arg>
250-
</bean>
251-
252-
<int:service-activator input-channel="in" output-channel="c1"
253-
expression="@dsf.setThreadKey(#root, headers['factoryToUse'])" />
254-
255-
<int-ftp:outbound-gateway request-channel="c1" reply-channel="c2" ... />
256-
257-
<int:service-activator input-channel="c2" output-channel="out"
258-
expression="@dsf.clearThreadKey(#root)" />
259-
----
260-
====
237+
See <<./handler-advice.adoc#context-holder-advice, Context Holder Advice>> for more information how this factory can be used together with a `ContextHolderRequestHandlerAdvice`.
261238

262239
IMPORTANT: When you use session caching (see <<ftp-session-caching>>), each of the delegates should be cached.
263240
You cannot cache the `DelegatingSessionFactory` itself.

src/reference/asciidoc/handler-advice.adoc

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ In addition to providing the general mechanism to apply AOP advice classes, Spri
5757
* `RateLimiterRequestHandlerAdvice` (described in <<rate-limiter-advice>>)
5858
* `CacheRequestHandlerAdvice` (described in <<cache-advice>>)
5959
* `ReactiveRequestHandlerAdvice` (described in <<reactive-advice>>)
60+
* `ContextHolderRequestHandlerAdvice` (described in <<context-holder-advice>>)
61+
62+
[[expression-advice]]
6063

6164
[[retry-advice]]
6265
===== Retry Advice
@@ -571,6 +574,44 @@ The `message` argument is the request message for the message handler and can be
571574
The `mono` argument is the result of this message handler's `handleRequestMessage()` method implementation.
572575
A nested `Mono.transform()` can also be called from this function to apply, for example, a https://spring.io/projects/spring-cloud-circuitbreaker[Reactive Circuit Breaker].
573576

577+
[[context-holder-advice]]
578+
==== Context Holder Advice
579+
580+
Starting with version 6.1, the `ContextHolderRequestHandlerAdvice` has been introduced.
581+
This advice takes some value from the request message as and stores it in the context holder.
582+
The value is clear from the context when an execution is finished on the target `MessageHandler`.
583+
The best way to think about this advice is similar to the programming flow where we store some value into a `ThreadLocal`, get access to it from the target call and then clean up the `ThreadLocal` after execution.
584+
The `ContextHolderRequestHandlerAdvice` requires these constructor arguments: a `Function<Message<?>, Object>` as a value provider, `Consumer<Object>` as a context set callback and `Runnable` as a context clean up hook.
585+
586+
Following is a sample how a `ContextHolderRequestHandlerAdvice` can be used in combination with a `o.s.i.file.remote.session.DelegatingSessionFactory`:
587+
588+
====
589+
[source, java]
590+
----
591+
@Bean
592+
DelegatingSessionFactory<?> dsf(SessionFactory<?> one, SessionFactory<?> two) {
593+
return new DelegatingSessionFactory<>(Map.of("one", one, "two", two), null);
594+
}
595+
596+
@Bean
597+
ContextHolderRequestHandlerAdvice contextHolderRequestHandlerAdvice(DelegatingSessionFactory<String> dsf) {
598+
return new ContextHolderRequestHandlerAdvice(message -> message.getHeaders().get("FACTORY_KEY"),
599+
dsf::setThreadKey, dsf::clearThreadKey);
600+
}
601+
602+
@ServiceActivator(inputChannel = "in", adviceChain = "contextHolderRequestHandlerAdvice")
603+
FtpOutboundGateway ftpOutboundGateway(DelegatingSessionFactory<?> sessionFactory) {
604+
return new FtpOutboundGateway(sessionFactory, "ls", "payload");
605+
}
606+
----
607+
====
608+
609+
And it is just enough to send a message to the `in` channel with a `FACTORY_KEY` header set to either `one` or `two`.
610+
The `ContextHolderRequestHandlerAdvice` sets the value from that header into a `DelegatingSessionFactory` via its `setThreadKey`.
611+
Then when `FtpOutboundGateway` executes an `ls` command a proper delegating `SessionFactory` is chosen from the `DelegatingSessionFactory` according to the value in its `ThreadLocal`.
612+
When the result is produced from the `FtpOutboundGateway`, a `ThreadLocal` value in the `DelegatingSessionFactory` is cleared according to the `clearThreadKey()` call from the `ContextHolderRequestHandlerAdvice`.
613+
See <<./ftp.adoc#ftp-dsf,Delegating Session Factory>> for more information.
614+
574615
[[custom-advice]]
575616
==== Custom Advice Classes
576617

src/reference/asciidoc/whats-new.adoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ In general the project has been moved to the latest dependency versions.
2323
The Zip Spring Integration Extension project has been migrated as the `spring-integration-zip` module.
2424
See <<./zip.adoc#zip,Zip Support>> for more information.
2525

26+
[[x6.1-context-holder-advice]]
27+
==== `ContextHolderRequestHandlerAdvice`
28+
29+
The `ContextHolderRequestHandlerAdvice` allows to store a value from a request message into some context around `MessageHandler` execution.
30+
See <<./handler-advice.adoc#context-holder-advice, Context Holder Advice>> for more information.
2631

2732
[[x6.1-general]]
2833
=== General Changes

0 commit comments

Comments
 (0)