-
Notifications
You must be signed in to change notification settings - Fork 1.1k
GH 3936 : Request attribute for webclient in webflux integration has been added #3950
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,7 @@ | |
import org.springframework.core.io.Resource; | ||
import org.springframework.expression.Expression; | ||
import org.springframework.expression.common.LiteralExpression; | ||
import org.springframework.expression.spel.support.StandardEvaluationContext; | ||
import org.springframework.http.HttpEntity; | ||
import org.springframework.http.HttpMethod; | ||
import org.springframework.http.HttpStatusCode; | ||
|
@@ -35,6 +36,7 @@ | |
import org.springframework.http.ResponseEntity; | ||
import org.springframework.http.client.reactive.ClientHttpRequest; | ||
import org.springframework.http.client.reactive.ClientHttpResponse; | ||
import org.springframework.integration.expression.ExpressionUtils; | ||
import org.springframework.integration.expression.ValueExpression; | ||
import org.springframework.integration.http.outbound.AbstractHttpRequestExecutingMessageHandler; | ||
import org.springframework.lang.Nullable; | ||
|
@@ -57,6 +59,7 @@ | |
* @author Artem Bilan | ||
* @author Gary Russell | ||
* @author David Graff | ||
* @author Jatin Saxena | ||
* | ||
* @since 5.0 | ||
* | ||
|
@@ -75,6 +78,11 @@ public class WebFluxRequestExecutingMessageHandler extends AbstractHttpRequestEx | |
|
||
private Expression publisherElementTypeExpression; | ||
|
||
private Expression attributeVariablesExpression; | ||
|
||
private StandardEvaluationContext evaluationContext; | ||
|
||
|
||
/** | ||
* Create a handler that will send requests to the provided URI. | ||
* @param uri The URI. | ||
|
@@ -193,11 +201,21 @@ public void setPublisherElementTypeExpression(Expression publisherElementTypeExp | |
this.publisherElementTypeExpression = publisherElementTypeExpression; | ||
} | ||
|
||
public void setAttributeVariablesExpression(Expression attributeVariablesExpression) { | ||
Assert.notNull(attributeVariablesExpression, "'attributeVariablesExpression' must not be null"); | ||
this.attributeVariablesExpression = attributeVariablesExpression; | ||
} | ||
|
||
@Override | ||
public String getComponentType() { | ||
return (isExpectReply() ? "webflux:outbound-gateway" : "webflux:outbound-channel-adapter"); | ||
} | ||
|
||
@Override | ||
protected final void doInit() { | ||
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory()); | ||
artembilan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
@Override | ||
@Nullable | ||
protected Object exchange(Object uri, HttpMethod httpMethod, HttpEntity<?> httpRequest, | ||
|
@@ -230,6 +248,12 @@ private WebClient.RequestBodySpec createRequestBodySpec(Object uri, HttpMethod h | |
} | ||
|
||
requestSpec = requestSpec.headers(headers -> headers.putAll(httpRequest.getHeaders())); | ||
|
||
if(attributeVariablesExpression != null) { | ||
Map<String, Object> attributeMap = evaluateAttributeVariables(requestMessage); | ||
requestSpec = requestSpec.attributes(map -> map.putAll(attributeMap)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's do some optimization not calling this method if |
||
} | ||
|
||
BodyInserter<?, ? super ClientHttpRequest> inserter = buildBodyInserterForRequest(requestMessage, httpRequest); | ||
if (inserter != null) { | ||
requestSpec.body(inserter); | ||
|
@@ -369,4 +393,7 @@ private Object createReplyFromResponse(Mono<ResponseEntity<Flux<Object>>> respon | |
.map(this::getReply); | ||
} | ||
|
||
private Map<String,Object> evaluateAttributeVariables(Message<?> requestMessage){ | ||
return this.attributeVariablesExpression.getValue(this.evaluationContext, requestMessage, Map.class); | ||
artembilan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
artembilan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,15 +18,20 @@ | |
|
||
import java.nio.charset.StandardCharsets; | ||
import java.time.Duration; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
|
||
import org.junit.jupiter.api.Test; | ||
import reactor.core.publisher.Flux; | ||
import reactor.core.publisher.Mono; | ||
import reactor.test.StepVerifier; | ||
|
||
import org.springframework.beans.factory.BeanFactory; | ||
import org.springframework.core.io.buffer.DataBuffer; | ||
import org.springframework.core.io.buffer.DataBufferFactory; | ||
import org.springframework.core.io.buffer.DataBufferLimitException; | ||
import org.springframework.expression.Expression; | ||
import org.springframework.expression.spel.standard.SpelExpressionParser; | ||
import org.springframework.http.HttpStatus; | ||
import org.springframework.http.MediaType; | ||
import org.springframework.http.client.reactive.ClientHttpConnector; | ||
|
@@ -40,17 +45,22 @@ | |
import org.springframework.messaging.MessageHandlingException; | ||
import org.springframework.messaging.support.ErrorMessage; | ||
import org.springframework.test.web.reactive.server.HttpHandlerConnector; | ||
import org.springframework.web.reactive.function.client.ClientRequest; | ||
import org.springframework.web.reactive.function.client.ClientResponse; | ||
import org.springframework.web.reactive.function.client.ExchangeFilterFunction; | ||
import org.springframework.web.reactive.function.client.ExchangeFunction; | ||
import org.springframework.web.reactive.function.client.ExchangeStrategies; | ||
import org.springframework.web.reactive.function.client.WebClient; | ||
import org.springframework.web.reactive.function.client.WebClientResponseException; | ||
|
||
import static org.assertj.core.api.Assertions.assertThat; | ||
import static org.mockito.Mockito.mock; | ||
|
||
/** | ||
* @author Shiliang Li | ||
* @author Artem Bilan | ||
* @author David Graff | ||
* | ||
* @author Jatin Saxena | ||
* @since 5.0 | ||
*/ | ||
class WebFluxRequestExecutingMessageHandlerTests { | ||
|
@@ -376,4 +386,63 @@ void testMaxInMemorySizeExceeded() { | |
.isEqualTo("Exceeded limit on max bytes to buffer : 1"); | ||
} | ||
|
||
@Test | ||
@SuppressWarnings("unchecked") | ||
void testFluxReplyWithRequestAttribute() { | ||
ClientHttpConnector httpConnector = new HttpHandlerConnector((request, response) -> { | ||
response.setStatusCode(HttpStatus.OK); | ||
response.getHeaders().setContentType(MediaType.TEXT_PLAIN); | ||
|
||
DataBufferFactory bufferFactory = response.bufferFactory(); | ||
|
||
Mono<DataBuffer> data = Mono.just(bufferFactory.wrap("foo\nbar\nbaz".getBytes())); | ||
|
||
return response.writeWith(data) | ||
.then(Mono.defer(response::setComplete)); | ||
}); | ||
|
||
class AttributeFilter implements ExchangeFilterFunction { | ||
|
||
Optional<Object> attributeValueName; | ||
|
||
@Override | ||
public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) { | ||
this.attributeValueName = request.attribute("name"); | ||
return next.exchange(request); | ||
} | ||
} | ||
|
||
AttributeFilter attributeFilter = new AttributeFilter(); | ||
WebClient webClient = WebClient.builder() | ||
.clientConnector(httpConnector) | ||
.filter(attributeFilter) | ||
.build(); | ||
|
||
String destinationUri = "https://www.springsource.org/spring-integration"; | ||
WebFluxRequestExecutingMessageHandler reactiveHandler = | ||
new WebFluxRequestExecutingMessageHandler(destinationUri, webClient); | ||
|
||
QueueChannel replyChannel = new QueueChannel(); | ||
reactiveHandler.setOutputChannel(replyChannel); | ||
reactiveHandler.setExpectedResponseType(String.class); | ||
reactiveHandler.setReplyPayloadToFlux(true); | ||
Expression expr = new SpelExpressionParser().parseExpression("{name:{first:'Nikola'}}"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just for your knowledge: there is a |
||
reactiveHandler.setAttributeVariablesExpression(expr); | ||
reactiveHandler.setBeanFactory(mock(BeanFactory.class)); | ||
reactiveHandler.afterPropertiesSet(); | ||
|
||
reactiveHandler.handleMessage(MessageBuilder.withPayload(Mono.just("hello, world")).build()); | ||
|
||
Message<?> receive = replyChannel.receive(10_000); | ||
|
||
assertThat(attributeFilter.attributeValueName).isPresent(); | ||
|
||
Map<String,String> attributeValueNameMap = (Map<String, String>) attributeFilter.attributeValueName.get(); | ||
|
||
assertThat(attributeValueNameMap.get("first")).isEqualTo("Nikola"); | ||
|
||
assertThat(receive).isNotNull(); | ||
|
||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -319,3 +319,8 @@ See <<./http.adoc#http-outbound,HTTP Outbound Components>> for more possible con | |
|
||
Since WebFlux components are fully based on the HTTP protocol, there is no difference in the HTTP headers mapping. | ||
See <<./http.adoc#http-header-mapping,HTTP Header Mappings>> for more possible options and components to use for mapping headers. | ||
|
||
[[webflux-request-attribute]] | ||
=== WebFlux Request Attribute | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
Starting with Version 6, Webflux outbound request can have request attribute which can be specified in `attribute-variables-expression` attribute. This attribute will take an `expression` that should be evaluated in `Map`. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "Version" word not capitalized. I wouldn't imply an XML config. Better to say Would be great to have some sample configuration and the reasoning behind this feature. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Request attribute is standard webclient feature which can be used in multiple ways. Sample configuration (https://github.com/saxenaj/spring-integration/blob/GH-3936/spring-integration-webflux/src/test/java/org/springframework/integration/webflux/config/WebFluxOutboundGatewayParserTests-context.xml#L43) and implementation (https://github.com/saxenaj/spring-integration/blob/GH-3936/spring-integration-webflux/src/test/java/org/springframework/integration/webflux/outbound/WebFluxRequestExecutingMessageHandlerTests.java#L391) are there in test cases. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -186,3 +186,11 @@ See <<./jms.adoc#jms,JMS Support>> for more information. | |
The `ChannelSecurityInterceptor` and its annotation `@SecuredChannel` and XML `<secured-channels>` configurations have been deprecated in favor of `AuthorizationChannelInterceptor`. | ||
|
||
See <<./security.adoc#security,Security Support>> for more information. | ||
|
||
[[x6.0-webflux]] | ||
|
||
artembilan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
=== Webflux request attribute support | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Blank line here between name and section body. |
||
Webclient Request attribute support has been added for outbound request.See <<./webflux.adoc#webflux-request-attribute,WebFlux Request Attribute>> for more information. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One sentence per line, please. |
||
|
||
|
||
|
Uh oh!
There was an error while loading. Please reload this page.