diff --git a/spring-integration-webflux/src/main/java/org/springframework/integration/webflux/config/WebFluxOutboundChannelAdapterParser.java b/spring-integration-webflux/src/main/java/org/springframework/integration/webflux/config/WebFluxOutboundChannelAdapterParser.java index 0072cf69af8..33ec0903141 100644 --- a/spring-integration-webflux/src/main/java/org/springframework/integration/webflux/config/WebFluxOutboundChannelAdapterParser.java +++ b/spring-integration-webflux/src/main/java/org/springframework/integration/webflux/config/WebFluxOutboundChannelAdapterParser.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ import org.w3c.dom.Element; +import org.springframework.beans.factory.config.BeanDefinition; import org.springframework.beans.factory.config.RuntimeBeanReference; import org.springframework.beans.factory.support.BeanDefinitionBuilder; import org.springframework.beans.factory.xml.ParserContext; @@ -31,7 +32,7 @@ * Parser for the 'outbound-channel-adapter' element of the webflux namespace. * * @author Artem Bilan - * + * @author Jatin Saxena * @since 5.0 */ public class WebFluxOutboundChannelAdapterParser extends HttpOutboundChannelAdapterParser { @@ -79,6 +80,13 @@ else if (hasTypeExpression) { .getBeanDefinition()); } + BeanDefinition attributeVariablesExpressionDef = IntegrationNamespaceUtils + .createExpressionDefIfAttributeDefined("attribute-variables-expression", element); + + if (attributeVariablesExpressionDef != null) { + builder.addPropertyValue("attributeVariablesExpression", attributeVariablesExpressionDef); + } + return builder; } diff --git a/spring-integration-webflux/src/main/java/org/springframework/integration/webflux/outbound/WebFluxRequestExecutingMessageHandler.java b/spring-integration-webflux/src/main/java/org/springframework/integration/webflux/outbound/WebFluxRequestExecutingMessageHandler.java index 076a57f2b85..3354728a336 100644 --- a/spring-integration-webflux/src/main/java/org/springframework/integration/webflux/outbound/WebFluxRequestExecutingMessageHandler.java +++ b/spring-integration-webflux/src/main/java/org/springframework/integration/webflux/outbound/WebFluxRequestExecutingMessageHandler.java @@ -27,6 +27,7 @@ import org.springframework.core.io.Resource; import org.springframework.expression.Expression; import org.springframework.expression.common.LiteralExpression; +import org.springframework.expression.spel.support.StandardEvaluationContext; import org.springframework.http.HttpEntity; import org.springframework.http.HttpMethod; import org.springframework.http.HttpStatusCode; @@ -35,11 +36,13 @@ import org.springframework.http.ResponseEntity; import org.springframework.http.client.reactive.ClientHttpRequest; import org.springframework.http.client.reactive.ClientHttpResponse; +import org.springframework.integration.expression.ExpressionUtils; import org.springframework.integration.expression.ValueExpression; import org.springframework.integration.http.outbound.AbstractHttpRequestExecutingMessageHandler; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.util.Assert; +import org.springframework.util.CollectionUtils; import org.springframework.util.MultiValueMap; import org.springframework.web.reactive.function.BodyExtractor; import org.springframework.web.reactive.function.BodyExtractors; @@ -57,6 +60,7 @@ * @author Artem Bilan * @author Gary Russell * @author David Graff + * @author Jatin Saxena * * @since 5.0 * @@ -75,6 +79,11 @@ public class WebFluxRequestExecutingMessageHandler extends AbstractHttpRequestEx private Expression publisherElementTypeExpression; + private Expression attributeVariablesExpression; + + private StandardEvaluationContext evaluationContext; + + /** * Create a handler that will send requests to the provided URI. * @param uri The URI. @@ -193,11 +202,28 @@ public void setPublisherElementTypeExpression(Expression publisherElementTypeExp this.publisherElementTypeExpression = publisherElementTypeExpression; } + /** + * Configure expression to evaluate request attribute which will be added to webclient request attribute. + * @param attributeVariablesExpression the expression to evaluate request attribute. + * @since 6.0 + * @see WebClient.RequestBodySpec#attributes + */ + public void setAttributeVariablesExpression(Expression attributeVariablesExpression) { + Assert.notNull(attributeVariablesExpression, "'attributeVariablesExpression' must not be null"); + this.attributeVariablesExpression = attributeVariablesExpression; + } + @Override public String getComponentType() { return (isExpectReply() ? "webflux:outbound-gateway" : "webflux:outbound-channel-adapter"); } + @Override + protected final void doInit() { + super.doInit(); + this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory()); + } + @Override @Nullable protected Object exchange(Object uri, HttpMethod httpMethod, HttpEntity httpRequest, @@ -230,6 +256,14 @@ private WebClient.RequestBodySpec createRequestBodySpec(Object uri, HttpMethod h } requestSpec = requestSpec.headers(headers -> headers.putAll(httpRequest.getHeaders())); + + if (this.attributeVariablesExpression != null) { + Map attributeMap = evaluateAttributeVariables(requestMessage); + if (!CollectionUtils.isEmpty(attributeMap)) { + requestSpec = requestSpec.attributes(map -> map.putAll(attributeMap)); + } + } + BodyInserter inserter = buildBodyInserterForRequest(requestMessage, httpRequest); if (inserter != null) { requestSpec.body(inserter); @@ -369,4 +403,9 @@ private Object createReplyFromResponse(Mono>> respon .map(this::getReply); } + @SuppressWarnings("unchecked") + private Map evaluateAttributeVariables(Message requestMessage) { + return this.attributeVariablesExpression.getValue(this.evaluationContext, requestMessage, Map.class); + } + } diff --git a/spring-integration-webflux/src/main/resources/org/springframework/integration/webflux/config/spring-integration-webflux.xsd b/spring-integration-webflux/src/main/resources/org/springframework/integration/webflux/config/spring-integration-webflux.xsd index bb07ef4908b..d2a04069c6d 100644 --- a/spring-integration-webflux/src/main/resources/org/springframework/integration/webflux/config/spring-integration-webflux.xsd +++ b/spring-integration-webflux/src/main/resources/org/springframework/integration/webflux/config/spring-integration-webflux.xsd @@ -419,6 +419,13 @@ + + + + + @@ -592,6 +599,13 @@ + + + + + diff --git a/spring-integration-webflux/src/test/java/org/springframework/integration/webflux/config/WebFluxOutboundGatewayParserTests-context.xml b/spring-integration-webflux/src/test/java/org/springframework/integration/webflux/config/WebFluxOutboundGatewayParserTests-context.xml index cbe48c001e5..b78b84d2104 100644 --- a/spring-integration-webflux/src/test/java/org/springframework/integration/webflux/config/WebFluxOutboundGatewayParserTests-context.xml +++ b/spring-integration-webflux/src/test/java/org/springframework/integration/webflux/config/WebFluxOutboundGatewayParserTests-context.xml @@ -39,7 +39,8 @@ reply-payload-to-flux="true" body-extractor="bodyExtractor" publisher-element-type-expression="headers.elementType" - extract-response-body="false"> + extract-response-body="false" + attribute-variables-expression="{name:{first:'Nikola',last:'Tesla'},dob:{day:10,month:'July',year:1856}}"> diff --git a/spring-integration-webflux/src/test/java/org/springframework/integration/webflux/config/WebFluxOutboundGatewayParserTests.java b/spring-integration-webflux/src/test/java/org/springframework/integration/webflux/config/WebFluxOutboundGatewayParserTests.java index 0887625592b..ce1d22ade15 100644 --- a/spring-integration-webflux/src/test/java/org/springframework/integration/webflux/config/WebFluxOutboundGatewayParserTests.java +++ b/spring-integration-webflux/src/test/java/org/springframework/integration/webflux/config/WebFluxOutboundGatewayParserTests.java @@ -40,6 +40,7 @@ /** * @author Artem Bilan + * @author Jatin Saxena * * @since 5.0 */ @@ -129,6 +130,8 @@ public void reactiveFullConfig() { .isEqualTo("headers.elementType"); assertThat(handlerAccessor.getPropertyValue("extractResponseBody")) .isEqualTo(false); + assertThat(handlerAccessor.getPropertyValue("attributeVariablesExpression.expression")) + .isEqualTo("{name:{first:'Nikola',last:'Tesla'},dob:{day:10,month:'July',year:1856}}"); } } diff --git a/spring-integration-webflux/src/test/java/org/springframework/integration/webflux/outbound/WebFluxRequestExecutingMessageHandlerTests.java b/spring-integration-webflux/src/test/java/org/springframework/integration/webflux/outbound/WebFluxRequestExecutingMessageHandlerTests.java index 99b9ad2cf2f..df3c7c91951 100644 --- a/spring-integration-webflux/src/test/java/org/springframework/integration/webflux/outbound/WebFluxRequestExecutingMessageHandlerTests.java +++ b/spring-integration-webflux/src/test/java/org/springframework/integration/webflux/outbound/WebFluxRequestExecutingMessageHandlerTests.java @@ -18,15 +18,20 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.util.Map; +import java.util.Optional; import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; +import org.springframework.beans.factory.BeanFactory; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferLimitException; +import org.springframework.expression.Expression; +import org.springframework.expression.spel.standard.SpelExpressionParser; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.client.reactive.ClientHttpConnector; @@ -40,17 +45,22 @@ import org.springframework.messaging.MessageHandlingException; import org.springframework.messaging.support.ErrorMessage; import org.springframework.test.web.reactive.server.HttpHandlerConnector; +import org.springframework.web.reactive.function.client.ClientRequest; +import org.springframework.web.reactive.function.client.ClientResponse; +import org.springframework.web.reactive.function.client.ExchangeFilterFunction; +import org.springframework.web.reactive.function.client.ExchangeFunction; import org.springframework.web.reactive.function.client.ExchangeStrategies; import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClientResponseException; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; /** * @author Shiliang Li * @author Artem Bilan * @author David Graff - * + * @author Jatin Saxena * @since 5.0 */ class WebFluxRequestExecutingMessageHandlerTests { @@ -376,4 +386,63 @@ void testMaxInMemorySizeExceeded() { .isEqualTo("Exceeded limit on max bytes to buffer : 1"); } + @Test + @SuppressWarnings("unchecked") + void testFluxReplyWithRequestAttribute() { + ClientHttpConnector httpConnector = new HttpHandlerConnector((request, response) -> { + response.setStatusCode(HttpStatus.OK); + response.getHeaders().setContentType(MediaType.TEXT_PLAIN); + + DataBufferFactory bufferFactory = response.bufferFactory(); + + Mono data = Mono.just(bufferFactory.wrap("foo\nbar\nbaz".getBytes())); + + return response.writeWith(data) + .then(Mono.defer(response::setComplete)); + }); + + class AttributeFilter implements ExchangeFilterFunction { + + Optional attributeValueName; + + @Override + public Mono filter(ClientRequest request, ExchangeFunction next) { + this.attributeValueName = request.attribute("name"); + return next.exchange(request); + } + } + + AttributeFilter attributeFilter = new AttributeFilter(); + WebClient webClient = WebClient.builder() + .clientConnector(httpConnector) + .filter(attributeFilter) + .build(); + + String destinationUri = "https://www.springsource.org/spring-integration"; + WebFluxRequestExecutingMessageHandler reactiveHandler = + new WebFluxRequestExecutingMessageHandler(destinationUri, webClient); + + QueueChannel replyChannel = new QueueChannel(); + reactiveHandler.setOutputChannel(replyChannel); + reactiveHandler.setExpectedResponseType(String.class); + reactiveHandler.setReplyPayloadToFlux(true); + Expression expr = new SpelExpressionParser().parseExpression("{name:{first:'Nikola'}}"); + reactiveHandler.setAttributeVariablesExpression(expr); + reactiveHandler.setBeanFactory(mock(BeanFactory.class)); + reactiveHandler.afterPropertiesSet(); + + reactiveHandler.handleMessage(MessageBuilder.withPayload(Mono.just("hello, world")).build()); + + Message receive = replyChannel.receive(10_000); + + assertThat(attributeFilter.attributeValueName).isPresent(); + + Map attributeValueNameMap = (Map) attributeFilter.attributeValueName.get(); + + assertThat(attributeValueNameMap.get("first")).isEqualTo("Nikola"); + + assertThat(receive).isNotNull(); + + } + } diff --git a/src/reference/asciidoc/webflux.adoc b/src/reference/asciidoc/webflux.adoc index 8edba5b95d0..389e50ed870 100644 --- a/src/reference/asciidoc/webflux.adoc +++ b/src/reference/asciidoc/webflux.adoc @@ -319,3 +319,10 @@ See <<./http.adoc#http-outbound,HTTP Outbound Components>> for more possible con Since WebFlux components are fully based on the HTTP protocol, there is no difference in the HTTP headers mapping. See <<./http.adoc#http-header-mapping,HTTP Header Mappings>> for more possible options and components to use for mapping headers. + +[[webflux-request-attributes]] +=== WebFlux Request Attributes + +Starting with version 6.0, Webflux outbound request `WebFluxRequestExecutingMessageHandler` can have request attributes which can be specified in `setAttributeVariablesExpression()`. +This will take an `expression` that should be evaluated in `Map`. +This will be helpful if an information in form of key-value object needs be passed from `Message` to request and downstream filter will get access to this attribute for further processing. \ No newline at end of file diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index ec7befe0729..20a56e93246 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -186,3 +186,12 @@ See <<./jms.adoc#jms,JMS Support>> for more information. The `ChannelSecurityInterceptor` and its annotation `@SecuredChannel` and XML `` configurations have been deprecated in favor of `AuthorizationChannelInterceptor`. See <<./security.adoc#security,Security Support>> for more information. + +[[x6.0-webflux]] +=== Webflux Request Attributes Support + +Webclient Request attributes support has been added for `WebFluxRequestExecutingMessageHandler`. +See <<./webflux.adoc#webflux-request-attributes,WebFlux Request Attributes>> for more information. + + +