Skip to content

Commit ddddd1d

Browse files
saxenajartembilan
authored andcommitted
GH-3936: WebFluxMH: Add request attribute support
Fixes #3936 I am passing request attributes which is getting used in exchangefilter to influence the flow. This request attribute is of type String as Key and Value as a user defined object. My expectation is to pass this information in request attributes so that it will eventually available in exchangefilter for further processing but i dont find a way to pass these request attribute in webflux integration. * Add webclient request attributes into `WebFluxRequestExecutingMessageHandler` * Improve code style and docs
1 parent 91007e6 commit ddddd1d

File tree

8 files changed

+164
-12
lines changed

8 files changed

+164
-12
lines changed

spring-integration-webflux/src/main/java/org/springframework/integration/webflux/config/WebFluxOutboundChannelAdapterParser.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@
1818

1919
import org.w3c.dom.Element;
2020

21+
import org.springframework.beans.factory.config.BeanDefinition;
2122
import org.springframework.beans.factory.config.RuntimeBeanReference;
2223
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
2324
import org.springframework.beans.factory.xml.ParserContext;
@@ -31,6 +32,7 @@
3132
* Parser for the 'outbound-channel-adapter' element of the webflux namespace.
3233
*
3334
* @author Artem Bilan
35+
* @author Jatin Saxena
3436
*
3537
* @since 5.0
3638
*/
@@ -79,6 +81,14 @@ else if (hasTypeExpression) {
7981
.getBeanDefinition());
8082
}
8183

84+
BeanDefinition attributeVariablesExpressionDef =
85+
IntegrationNamespaceUtils.createExpressionDefIfAttributeDefined("attribute-variables-expression",
86+
element);
87+
88+
if (attributeVariablesExpressionDef != null) {
89+
builder.addPropertyValue("attributeVariablesExpression", attributeVariablesExpressionDef);
90+
}
91+
8292
return builder;
8393
}
8494

spring-integration-webflux/src/main/java/org/springframework/integration/webflux/outbound/WebFluxRequestExecutingMessageHandler.java

+39
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.springframework.core.io.Resource;
2828
import org.springframework.expression.Expression;
2929
import org.springframework.expression.common.LiteralExpression;
30+
import org.springframework.expression.spel.support.StandardEvaluationContext;
3031
import org.springframework.http.HttpEntity;
3132
import org.springframework.http.HttpMethod;
3233
import org.springframework.http.HttpStatusCode;
@@ -35,11 +36,13 @@
3536
import org.springframework.http.ResponseEntity;
3637
import org.springframework.http.client.reactive.ClientHttpRequest;
3738
import org.springframework.http.client.reactive.ClientHttpResponse;
39+
import org.springframework.integration.expression.ExpressionUtils;
3840
import org.springframework.integration.expression.ValueExpression;
3941
import org.springframework.integration.http.outbound.AbstractHttpRequestExecutingMessageHandler;
4042
import org.springframework.lang.Nullable;
4143
import org.springframework.messaging.Message;
4244
import org.springframework.util.Assert;
45+
import org.springframework.util.CollectionUtils;
4346
import org.springframework.util.MultiValueMap;
4447
import org.springframework.web.reactive.function.BodyExtractor;
4548
import org.springframework.web.reactive.function.BodyExtractors;
@@ -57,6 +60,7 @@
5760
* @author Artem Bilan
5861
* @author Gary Russell
5962
* @author David Graff
63+
* @author Jatin Saxena
6064
*
6165
* @since 5.0
6266
*
@@ -75,6 +79,11 @@ public class WebFluxRequestExecutingMessageHandler extends AbstractHttpRequestEx
7579

7680
private Expression publisherElementTypeExpression;
7781

82+
private Expression attributeVariablesExpression;
83+
84+
private StandardEvaluationContext evaluationContext;
85+
86+
7887
/**
7988
* Create a handler that will send requests to the provided URI.
8089
* @param uri The URI.
@@ -193,11 +202,28 @@ public void setPublisherElementTypeExpression(Expression publisherElementTypeExp
193202
this.publisherElementTypeExpression = publisherElementTypeExpression;
194203
}
195204

205+
/**
206+
* Configure expression to evaluate request attribute which will be added to webclient request attribute.
207+
* @param attributeVariablesExpression the expression to evaluate request attribute.
208+
* @since 6.0
209+
* @see WebClient.RequestBodySpec#attributes
210+
*/
211+
public void setAttributeVariablesExpression(Expression attributeVariablesExpression) {
212+
Assert.notNull(attributeVariablesExpression, "'attributeVariablesExpression' must not be null");
213+
this.attributeVariablesExpression = attributeVariablesExpression;
214+
}
215+
196216
@Override
197217
public String getComponentType() {
198218
return (isExpectReply() ? "webflux:outbound-gateway" : "webflux:outbound-channel-adapter");
199219
}
200220

221+
@Override
222+
protected final void doInit() {
223+
super.doInit();
224+
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
225+
}
226+
201227
@Override
202228
@Nullable
203229
protected Object exchange(Object uri, HttpMethod httpMethod, HttpEntity<?> httpRequest,
@@ -230,13 +256,26 @@ private WebClient.RequestBodySpec createRequestBodySpec(Object uri, HttpMethod h
230256
}
231257

232258
requestSpec = requestSpec.headers(headers -> headers.putAll(httpRequest.getHeaders()));
259+
260+
if (this.attributeVariablesExpression != null) {
261+
Map<String, Object> attributeMap = evaluateAttributeVariables(requestMessage);
262+
if (!CollectionUtils.isEmpty(attributeMap)) {
263+
requestSpec = requestSpec.attributes(map -> map.putAll(attributeMap));
264+
}
265+
}
266+
233267
BodyInserter<?, ? super ClientHttpRequest> inserter = buildBodyInserterForRequest(requestMessage, httpRequest);
234268
if (inserter != null) {
235269
requestSpec.body(inserter);
236270
}
237271
return requestSpec;
238272
}
239273

274+
@SuppressWarnings("unchecked")
275+
private Map<String, Object> evaluateAttributeVariables(Message<?> requestMessage) {
276+
return this.attributeVariablesExpression.getValue(this.evaluationContext, requestMessage, Map.class);
277+
}
278+
240279
@Nullable
241280
private BodyInserter<?, ? super ClientHttpRequest> buildBodyInserterForRequest(Message<?> requestMessage,
242281
HttpEntity<?> httpRequest) {

spring-integration-webflux/src/main/resources/org/springframework/integration/webflux/config/spring-integration-webflux.xsd

+14
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,13 @@
419419
</xsd:documentation>
420420
</xsd:annotation>
421421
</xsd:attribute>
422+
<xsd:attribute name="attribute-variables-expression" type="xsd:string">
423+
<xsd:annotation>
424+
<xsd:documentation><![CDATA[
425+
Specifies the SpEL expression to be evaluated as a Map for request attributes.
426+
]]></xsd:documentation>
427+
</xsd:annotation>
428+
</xsd:attribute>
422429
</xsd:complexType>
423430
</xsd:element>
424431

@@ -592,6 +599,13 @@
592599
<xsd:union memberTypes="xsd:boolean xsd:string"/>
593600
</xsd:simpleType>
594601
</xsd:attribute>
602+
<xsd:attribute name="attribute-variables-expression" type="xsd:string">
603+
<xsd:annotation>
604+
<xsd:documentation><![CDATA[
605+
Specifies the SpEL expression to be evaluated as a Map for request attributes.
606+
]]></xsd:documentation>
607+
</xsd:annotation>
608+
</xsd:attribute>
595609
</xsd:extension>
596610
</xsd:complexContent>
597611
</xsd:complexType>

spring-integration-webflux/src/test/java/org/springframework/integration/webflux/config/WebFluxOutboundGatewayParserTests-context.xml

+2-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@
3939
reply-payload-to-flux="true"
4040
body-extractor="bodyExtractor"
4141
publisher-element-type-expression="headers.elementType"
42-
extract-response-body="false">
42+
extract-response-body="false"
43+
attribute-variables-expression="{name:{first:'Nikola',last:'Tesla'},dob:{day:10,month:'July',year:1856}}">
4344
<uri-variable name="foo" expression="headers.bar"/>
4445
</outbound-gateway>
4546

spring-integration-webflux/src/test/java/org/springframework/integration/webflux/config/WebFluxOutboundGatewayParserTests.java

+3
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040

4141
/**
4242
* @author Artem Bilan
43+
* @author Jatin Saxena
4344
*
4445
* @since 5.0
4546
*/
@@ -129,6 +130,8 @@ public void reactiveFullConfig() {
129130
.isEqualTo("headers.elementType");
130131
assertThat(handlerAccessor.getPropertyValue("extractResponseBody"))
131132
.isEqualTo(false);
133+
assertThat(handlerAccessor.getPropertyValue("attributeVariablesExpression.expression"))
134+
.isEqualTo("{name:{first:'Nikola',last:'Tesla'},dob:{day:10,month:'July',year:1856}}");
132135
}
133136

134137
}

spring-integration-webflux/src/test/java/org/springframework/integration/webflux/outbound/WebFluxRequestExecutingMessageHandlerTests.java

+77-10
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,21 @@
1616

1717
package org.springframework.integration.webflux.outbound;
1818

19-
import java.nio.charset.StandardCharsets;
2019
import java.time.Duration;
20+
import java.util.Map;
21+
import java.util.concurrent.atomic.AtomicReference;
2122

2223
import org.junit.jupiter.api.Test;
2324
import reactor.core.publisher.Flux;
2425
import reactor.core.publisher.Mono;
2526
import reactor.test.StepVerifier;
2627

28+
import org.springframework.beans.factory.BeanFactory;
2729
import org.springframework.core.io.buffer.DataBuffer;
2830
import org.springframework.core.io.buffer.DataBufferFactory;
2931
import org.springframework.core.io.buffer.DataBufferLimitException;
32+
import org.springframework.expression.Expression;
33+
import org.springframework.expression.spel.standard.SpelExpressionParser;
3034
import org.springframework.http.HttpStatus;
3135
import org.springframework.http.MediaType;
3236
import org.springframework.http.client.reactive.ClientHttpConnector;
@@ -40,16 +44,22 @@
4044
import org.springframework.messaging.MessageHandlingException;
4145
import org.springframework.messaging.support.ErrorMessage;
4246
import org.springframework.test.web.reactive.server.HttpHandlerConnector;
47+
import org.springframework.web.reactive.function.client.ClientRequest;
48+
import org.springframework.web.reactive.function.client.ClientResponse;
49+
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
50+
import org.springframework.web.reactive.function.client.ExchangeFunction;
4351
import org.springframework.web.reactive.function.client.ExchangeStrategies;
4452
import org.springframework.web.reactive.function.client.WebClient;
4553
import org.springframework.web.reactive.function.client.WebClientResponseException;
4654

4755
import static org.assertj.core.api.Assertions.assertThat;
56+
import static org.mockito.Mockito.mock;
4857

4958
/**
5059
* @author Shiliang Li
5160
* @author Artem Bilan
5261
* @author David Graff
62+
* @author Jatin Saxena
5363
*
5464
* @since 5.0
5565
*/
@@ -289,15 +299,16 @@ void testClientHttpResponseErrorAsReply() {
289299

290300
Flux<DataBuffer> data =
291301
Flux.just(
292-
bufferFactory.wrap("{".getBytes(StandardCharsets.UTF_8)),
293-
bufferFactory.wrap(" \"error\": \"Not Found\",".getBytes(StandardCharsets.UTF_8)),
294-
bufferFactory.wrap(" \"message\": \"404 NOT_FOUND\",".getBytes(StandardCharsets.UTF_8)),
295-
bufferFactory.wrap(" \"path\": \"/spring-integration\",".getBytes(StandardCharsets.UTF_8)),
296-
bufferFactory.wrap(" \"status\": 404,".getBytes(StandardCharsets.UTF_8)),
297-
bufferFactory.wrap(" \"timestamp\": \"1970-01-01T00:00:00.000+00:00\",".getBytes(StandardCharsets.UTF_8)),
298-
bufferFactory.wrap(" \"trace\": \"some really\nlong\ntrace\",".getBytes(StandardCharsets.UTF_8)),
299-
bufferFactory.wrap("}".getBytes(StandardCharsets.UTF_8))
300-
);
302+
"{",
303+
" \"error\": \"Not Found\",",
304+
" \"message\": \"404 NOT_FOUND\",",
305+
" \"path\": \"/spring-integration\",",
306+
" \"status\": 404,",
307+
" \"timestamp\": \"1970-01-01T00:00:00.000+00:00\",",
308+
" \"trace\": \"some really\nlong\ntrace\",",
309+
"}")
310+
.map(String::getBytes)
311+
.map(bufferFactory::wrap);
301312

302313
return response.writeWith(data)
303314
.then(Mono.defer(response::setComplete));
@@ -376,4 +387,60 @@ void testMaxInMemorySizeExceeded() {
376387
.isEqualTo("Exceeded limit on max bytes to buffer : 1");
377388
}
378389

390+
@Test
391+
@SuppressWarnings("unchecked")
392+
void testFluxReplyWithRequestAttribute() {
393+
ClientHttpConnector httpConnector = new HttpHandlerConnector((request, response) -> {
394+
response.setStatusCode(HttpStatus.OK);
395+
response.getHeaders().setContentType(MediaType.TEXT_PLAIN);
396+
397+
DataBufferFactory bufferFactory = response.bufferFactory();
398+
399+
Mono<DataBuffer> data = Mono.just(bufferFactory.wrap("foo\nbar\nbaz".getBytes()));
400+
401+
return response.writeWith(data)
402+
.then(Mono.defer(response::setComplete));
403+
});
404+
405+
class AttributeFilter implements ExchangeFilterFunction {
406+
407+
final AtomicReference<Object> attributeValueName = new AtomicReference<>();
408+
409+
@Override
410+
public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) {
411+
this.attributeValueName.set(request.attribute("name").orElse(null));
412+
return next.exchange(request);
413+
}
414+
}
415+
416+
AttributeFilter attributeFilter = new AttributeFilter();
417+
WebClient webClient = WebClient.builder()
418+
.clientConnector(httpConnector)
419+
.filter(attributeFilter)
420+
.build();
421+
422+
String destinationUri = "https://www.springsource.org/spring-integration";
423+
WebFluxRequestExecutingMessageHandler reactiveHandler =
424+
new WebFluxRequestExecutingMessageHandler(destinationUri, webClient);
425+
426+
QueueChannel replyChannel = new QueueChannel();
427+
reactiveHandler.setOutputChannel(replyChannel);
428+
reactiveHandler.setExpectedResponseType(String.class);
429+
reactiveHandler.setReplyPayloadToFlux(true);
430+
Expression expr = new SpelExpressionParser().parseExpression("{name:{first:'Nikola'}}");
431+
reactiveHandler.setAttributeVariablesExpression(expr);
432+
reactiveHandler.setBeanFactory(mock(BeanFactory.class));
433+
reactiveHandler.afterPropertiesSet();
434+
435+
reactiveHandler.handleMessage(MessageBuilder.withPayload(Mono.just("hello, world")).build());
436+
437+
Message<?> receive = replyChannel.receive(10_000);
438+
439+
assertThat(attributeFilter.attributeValueName.get()).isNotNull();
440+
441+
Map<String, String> attributeValueNameMap = (Map<String, String>) attributeFilter.attributeValueName.get();
442+
assertThat(attributeValueNameMap.get("first")).isEqualTo("Nikola");
443+
assertThat(receive).isNotNull();
444+
}
445+
379446
}

src/reference/asciidoc/webflux.adoc

+8
Original file line numberDiff line numberDiff line change
@@ -319,3 +319,11 @@ See <<./http.adoc#http-outbound,HTTP Outbound Components>> for more possible con
319319

320320
Since WebFlux components are fully based on the HTTP protocol, there is no difference in the HTTP headers mapping.
321321
See <<./http.adoc#http-header-mapping,HTTP Header Mappings>> for more possible options and components to use for mapping headers.
322+
323+
[[webflux-request-attributes]]
324+
=== WebFlux Request Attributes
325+
326+
Starting with version 6.0, the `WebFluxRequestExecutingMessageHandler` can be configured to evaluate request attributes via `setAttributeVariablesExpression()`.
327+
This SpEL expression must be evaluated in `Map`.
328+
Such a map is then propagated to the `WebClient.RequestBodySpec.attributes(Consumer<Map<String, Object>> attributesConsumer)` HTTP request configuration callback.
329+
This will be helpful if an information in a form of key-value object needs to be passed from `Message` to request and downstream filter will get access to these attributes for further processing.

src/reference/asciidoc/whats-new.adoc

+10
Original file line numberDiff line numberDiff line change
@@ -202,3 +202,13 @@ See <<./jms.adoc#jms,JMS Support>> for more information.
202202
The `ChannelSecurityInterceptor` and its annotation `@SecuredChannel` and XML `<secured-channels>` configurations have been deprecated in favor of `AuthorizationChannelInterceptor`.
203203

204204
See <<./security.adoc#security,Security Support>> for more information.
205+
206+
[[x6.0-webflux]]
207+
=== Webflux Request Attributes Support
208+
209+
Webclient Request attributes support has been added for `WebFluxRequestExecutingMessageHandler`.
210+
211+
See <<./webflux.adoc#webflux-request-attributes,WebFlux Request Attributes>> for more information.
212+
213+
214+

0 commit comments

Comments
 (0)