Skip to content

GH 3936 : Request attribute for webclient in webflux integration has been added #3950

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@

import org.w3c.dom.Element;

import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.RuntimeBeanReference;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.xml.ParserContext;
Expand All @@ -31,7 +32,7 @@
* Parser for the 'outbound-channel-adapter' element of the webflux namespace.
*
* @author Artem Bilan
*
* @author Jatin Saxena
* @since 5.0
*/
public class WebFluxOutboundChannelAdapterParser extends HttpOutboundChannelAdapterParser {
Expand Down Expand Up @@ -79,6 +80,13 @@ else if (hasTypeExpression) {
.getBeanDefinition());
}

BeanDefinition attributeVariablesExpressionDef = IntegrationNamespaceUtils
.createExpressionDefIfAttributeDefined("attribute-variables-expression", element);

if (attributeVariablesExpressionDef != null) {
builder.addPropertyValue("attributeVariablesExpression", attributeVariablesExpressionDef);
}

return builder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.springframework.core.io.Resource;
import org.springframework.expression.Expression;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatusCode;
Expand All @@ -35,6 +36,7 @@
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.reactive.ClientHttpRequest;
import org.springframework.http.client.reactive.ClientHttpResponse;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.integration.expression.ValueExpression;
import org.springframework.integration.http.outbound.AbstractHttpRequestExecutingMessageHandler;
import org.springframework.lang.Nullable;
Expand All @@ -57,6 +59,7 @@
* @author Artem Bilan
* @author Gary Russell
* @author David Graff
* @author Jatin Saxena
*
* @since 5.0
*
Expand All @@ -75,6 +78,11 @@ public class WebFluxRequestExecutingMessageHandler extends AbstractHttpRequestEx

private Expression publisherElementTypeExpression;

private Expression attributeVariablesExpression;

private StandardEvaluationContext evaluationContext;


/**
* Create a handler that will send requests to the provided URI.
* @param uri The URI.
Expand Down Expand Up @@ -193,11 +201,21 @@ public void setPublisherElementTypeExpression(Expression publisherElementTypeExp
this.publisherElementTypeExpression = publisherElementTypeExpression;
}

public void setAttributeVariablesExpression(Expression attributeVariablesExpression) {
Assert.notNull(attributeVariablesExpression, "'attributeVariablesExpression' must not be null");
this.attributeVariablesExpression = attributeVariablesExpression;
}

@Override
public String getComponentType() {
return (isExpectReply() ? "webflux:outbound-gateway" : "webflux:outbound-channel-adapter");
}

@Override
protected final void doInit() {
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
}

@Override
@Nullable
protected Object exchange(Object uri, HttpMethod httpMethod, HttpEntity<?> httpRequest,
Expand Down Expand Up @@ -230,6 +248,12 @@ private WebClient.RequestBodySpec createRequestBodySpec(Object uri, HttpMethod h
}

requestSpec = requestSpec.headers(headers -> headers.putAll(httpRequest.getHeaders()));

if(attributeVariablesExpression != null) {
Map<String, Object> attributeMap = evaluateAttributeVariables(requestMessage);
requestSpec = requestSpec.attributes(map -> map.putAll(attributeMap));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do some optimization not calling this method if attributeMap.isEmpty()!

}

BodyInserter<?, ? super ClientHttpRequest> inserter = buildBodyInserterForRequest(requestMessage, httpRequest);
if (inserter != null) {
requestSpec.body(inserter);
Expand Down Expand Up @@ -369,4 +393,7 @@ private Object createReplyFromResponse(Mono<ResponseEntity<Flux<Object>>> respon
.map(this::getReply);
}

private Map<String,Object> evaluateAttributeVariables(Message<?> requestMessage){
return this.attributeVariablesExpression.getValue(this.evaluationContext, requestMessage, Map.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,13 @@
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="attribute-variables-expression" type="xsd:string">
<xsd:annotation>
<xsd:documentation><![CDATA[
Specifies the SpEL expression to be evaluated as a Map for request attributes.
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
</xsd:element>

Expand Down Expand Up @@ -592,6 +599,13 @@
<xsd:union memberTypes="xsd:boolean xsd:string"/>
</xsd:simpleType>
</xsd:attribute>
<xsd:attribute name="attribute-variables-expression" type="xsd:string">
<xsd:annotation>
<xsd:documentation><![CDATA[
Specifies the SpEL expression to be evaluated as a Map for request attributes.
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:extension>
</xsd:complexContent>
</xsd:complexType>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
reply-payload-to-flux="true"
body-extractor="bodyExtractor"
publisher-element-type-expression="headers.elementType"
extract-response-body="false">
extract-response-body="false"
attribute-variables-expression="{name:{first:'Nikola',last:'Tesla'},dob:{day:10,month:'July',year:1856}}">
<uri-variable name="foo" expression="headers.bar"/>
</outbound-gateway>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

/**
* @author Artem Bilan
* @author Jatin Saxena
*
* @since 5.0
*/
Expand Down Expand Up @@ -129,6 +130,8 @@ public void reactiveFullConfig() {
.isEqualTo("headers.elementType");
assertThat(handlerAccessor.getPropertyValue("extractResponseBody"))
.isEqualTo(false);
assertThat(handlerAccessor.getPropertyValue("attributeVariablesExpression.expression"))
.isEqualTo("{name:{first:'Nikola',last:'Tesla'},dob:{day:10,month:'July',year:1856}}");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,20 @@

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;

import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferLimitException;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ClientHttpConnector;
Expand All @@ -40,17 +45,22 @@
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.test.web.reactive.server.HttpHandlerConnector;
import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.ExchangeFunction;
import org.springframework.web.reactive.function.client.ExchangeStrategies;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientResponseException;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;

/**
* @author Shiliang Li
* @author Artem Bilan
* @author David Graff
*
* @author Jatin Saxena
* @since 5.0
*/
class WebFluxRequestExecutingMessageHandlerTests {
Expand Down Expand Up @@ -376,4 +386,63 @@ void testMaxInMemorySizeExceeded() {
.isEqualTo("Exceeded limit on max bytes to buffer : 1");
}

@Test
@SuppressWarnings("unchecked")
void testFluxReplyWithRequestAttribute() {
ClientHttpConnector httpConnector = new HttpHandlerConnector((request, response) -> {
response.setStatusCode(HttpStatus.OK);
response.getHeaders().setContentType(MediaType.TEXT_PLAIN);

DataBufferFactory bufferFactory = response.bufferFactory();

Mono<DataBuffer> data = Mono.just(bufferFactory.wrap("foo\nbar\nbaz".getBytes()));

return response.writeWith(data)
.then(Mono.defer(response::setComplete));
});

class AttributeFilter implements ExchangeFilterFunction {

Optional<Object> attributeValueName;

@Override
public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) {
this.attributeValueName = request.attribute("name");
return next.exchange(request);
}
}

AttributeFilter attributeFilter = new AttributeFilter();
WebClient webClient = WebClient.builder()
.clientConnector(httpConnector)
.filter(attributeFilter)
.build();

String destinationUri = "https://www.springsource.org/spring-integration";
WebFluxRequestExecutingMessageHandler reactiveHandler =
new WebFluxRequestExecutingMessageHandler(destinationUri, webClient);

QueueChannel replyChannel = new QueueChannel();
reactiveHandler.setOutputChannel(replyChannel);
reactiveHandler.setExpectedResponseType(String.class);
reactiveHandler.setReplyPayloadToFlux(true);
Expression expr = new SpelExpressionParser().parseExpression("{name:{first:'Nikola'}}");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for your knowledge: there is a FunctionExpression impl to be able to call as a lamda.
So, this one would be as simple as: Expression expr = new FunctionExpression<Object>(o -> Map.of("first", "Nikola")).
and much faster than SpEL parsing and its evaluation via reflection 😄

reactiveHandler.setAttributeVariablesExpression(expr);
reactiveHandler.setBeanFactory(mock(BeanFactory.class));
reactiveHandler.afterPropertiesSet();

reactiveHandler.handleMessage(MessageBuilder.withPayload(Mono.just("hello, world")).build());

Message<?> receive = replyChannel.receive(10_000);

assertThat(attributeFilter.attributeValueName).isPresent();

Map<String,String> attributeValueNameMap = (Map<String, String>) attributeFilter.attributeValueName.get();

assertThat(attributeValueNameMap.get("first")).isEqualTo("Nikola");

assertThat(receive).isNotNull();

}

}
5 changes: 5 additions & 0 deletions src/reference/asciidoc/webflux.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -319,3 +319,8 @@ See <<./http.adoc#http-outbound,HTTP Outbound Components>> for more possible con

Since WebFlux components are fully based on the HTTP protocol, there is no difference in the HTTP headers mapping.
See <<./http.adoc#http-header-mapping,HTTP Header Mappings>> for more possible options and components to use for mapping headers.

[[webflux-request-attribute]]
=== WebFlux Request Attribute
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Attributes


Starting with Version 6, Webflux outbound request can have request attribute which can be specified in `attribute-variables-expression` attribute. This attribute will take an `expression` that should be evaluated in `Map`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Version" word not capitalized.
Version number is 6.0.
One sentence per line rule, please: https://asciidoctor.org/docs/asciidoc-recommended-practices/#one-sentence-per-line.

I wouldn't imply an XML config. Better to say WebFluxRequestExecutingMessageHandler and setAttributeVariablesExpression().
The XML configuration is really left in a maintenance mode and better to move away from it, more over with modern trends about native images.

Would be great to have some sample configuration and the reasoning behind this feature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

8 changes: 8 additions & 0 deletions src/reference/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -186,3 +186,11 @@ See <<./jms.adoc#jms,JMS Support>> for more information.
The `ChannelSecurityInterceptor` and its annotation `@SecuredChannel` and XML `<secured-channels>` configurations have been deprecated in favor of `AuthorizationChannelInterceptor`.

See <<./security.adoc#security,Security Support>> for more information.

[[x6.0-webflux]]

=== Webflux request attribute support
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blank line here between name and section body.

Webclient Request attribute support has been added for outbound request.See <<./webflux.adoc#webflux-request-attribute,WebFlux Request Attribute>> for more information.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One sentence per line, please.