-
Notifications
You must be signed in to change notification settings - Fork 1.1k
GH 3936 : Request attribute for webclient in webflux integration has been added #3950
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,7 @@ | |
import org.springframework.core.io.Resource; | ||
import org.springframework.expression.Expression; | ||
import org.springframework.expression.common.LiteralExpression; | ||
import org.springframework.expression.spel.support.StandardEvaluationContext; | ||
import org.springframework.http.HttpEntity; | ||
import org.springframework.http.HttpMethod; | ||
import org.springframework.http.HttpStatusCode; | ||
|
@@ -35,6 +36,7 @@ | |
import org.springframework.http.ResponseEntity; | ||
import org.springframework.http.client.reactive.ClientHttpRequest; | ||
import org.springframework.http.client.reactive.ClientHttpResponse; | ||
import org.springframework.integration.expression.ExpressionUtils; | ||
import org.springframework.integration.expression.ValueExpression; | ||
import org.springframework.integration.http.outbound.AbstractHttpRequestExecutingMessageHandler; | ||
import org.springframework.lang.Nullable; | ||
|
@@ -57,6 +59,7 @@ | |
* @author Artem Bilan | ||
* @author Gary Russell | ||
* @author David Graff | ||
* @author Jatin Saxena | ||
* | ||
* @since 5.0 | ||
* | ||
|
@@ -75,6 +78,11 @@ public class WebFluxRequestExecutingMessageHandler extends AbstractHttpRequestEx | |
|
||
private Expression publisherElementTypeExpression; | ||
|
||
private Expression attributeVariablesExpression; | ||
|
||
private StandardEvaluationContext evaluationContext; | ||
|
||
|
||
/** | ||
* Create a handler that will send requests to the provided URI. | ||
* @param uri The URI. | ||
|
@@ -193,11 +201,27 @@ public void setPublisherElementTypeExpression(Expression publisherElementTypeExp | |
this.publisherElementTypeExpression = publisherElementTypeExpression; | ||
} | ||
|
||
/** | ||
* Configure expression to evaluate request attribute which will be added to webclient request attribute. | ||
* @param attributeVariablesExpression the expression to evaluate request attribute. | ||
* @since 6.0 | ||
* @see WebClient.RequestBodySpec#attributes | ||
*/ | ||
public void setAttributeVariablesExpression(Expression attributeVariablesExpression) { | ||
Assert.notNull(attributeVariablesExpression, "'attributeVariablesExpression' must not be null"); | ||
this.attributeVariablesExpression = attributeVariablesExpression; | ||
} | ||
|
||
@Override | ||
public String getComponentType() { | ||
return (isExpectReply() ? "webflux:outbound-gateway" : "webflux:outbound-channel-adapter"); | ||
} | ||
|
||
@Override | ||
protected final void doInit() { | ||
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory()); | ||
artembilan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
@Override | ||
@Nullable | ||
protected Object exchange(Object uri, HttpMethod httpMethod, HttpEntity<?> httpRequest, | ||
|
@@ -230,6 +254,14 @@ private WebClient.RequestBodySpec createRequestBodySpec(Object uri, HttpMethod h | |
} | ||
|
||
requestSpec = requestSpec.headers(headers -> headers.putAll(httpRequest.getHeaders())); | ||
|
||
if (this.attributeVariablesExpression != null) { | ||
Map<String, Object> attributeMap = evaluateAttributeVariables(requestMessage); | ||
if (attributeMap != null && !attributeMap.isEmpty()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
requestSpec = requestSpec.attributes(map -> map.putAll(attributeMap)); | ||
} | ||
} | ||
|
||
BodyInserter<?, ? super ClientHttpRequest> inserter = buildBodyInserterForRequest(requestMessage, httpRequest); | ||
if (inserter != null) { | ||
requestSpec.body(inserter); | ||
|
@@ -369,4 +401,9 @@ private Object createReplyFromResponse(Mono<ResponseEntity<Flux<Object>>> respon | |
.map(this::getReply); | ||
} | ||
|
||
@SuppressWarnings("unchecked") | ||
private Map<String, Object> evaluateAttributeVariables(Message<?> requestMessage) { | ||
return this.attributeVariablesExpression.getValue(this.evaluationContext, requestMessage, Map.class); | ||
artembilan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
artembilan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,15 +18,20 @@ | |
|
||
import java.nio.charset.StandardCharsets; | ||
import java.time.Duration; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
|
||
import org.junit.jupiter.api.Test; | ||
import reactor.core.publisher.Flux; | ||
import reactor.core.publisher.Mono; | ||
import reactor.test.StepVerifier; | ||
|
||
import org.springframework.beans.factory.BeanFactory; | ||
import org.springframework.core.io.buffer.DataBuffer; | ||
import org.springframework.core.io.buffer.DataBufferFactory; | ||
import org.springframework.core.io.buffer.DataBufferLimitException; | ||
import org.springframework.expression.Expression; | ||
import org.springframework.expression.spel.standard.SpelExpressionParser; | ||
import org.springframework.http.HttpStatus; | ||
import org.springframework.http.MediaType; | ||
import org.springframework.http.client.reactive.ClientHttpConnector; | ||
|
@@ -40,17 +45,22 @@ | |
import org.springframework.messaging.MessageHandlingException; | ||
import org.springframework.messaging.support.ErrorMessage; | ||
import org.springframework.test.web.reactive.server.HttpHandlerConnector; | ||
import org.springframework.web.reactive.function.client.ClientRequest; | ||
import org.springframework.web.reactive.function.client.ClientResponse; | ||
import org.springframework.web.reactive.function.client.ExchangeFilterFunction; | ||
import org.springframework.web.reactive.function.client.ExchangeFunction; | ||
import org.springframework.web.reactive.function.client.ExchangeStrategies; | ||
import org.springframework.web.reactive.function.client.WebClient; | ||
import org.springframework.web.reactive.function.client.WebClientResponseException; | ||
|
||
import static org.assertj.core.api.Assertions.assertThat; | ||
import static org.mockito.Mockito.mock; | ||
|
||
/** | ||
* @author Shiliang Li | ||
* @author Artem Bilan | ||
* @author David Graff | ||
* | ||
* @author Jatin Saxena | ||
* @since 5.0 | ||
*/ | ||
class WebFluxRequestExecutingMessageHandlerTests { | ||
|
@@ -376,4 +386,63 @@ void testMaxInMemorySizeExceeded() { | |
.isEqualTo("Exceeded limit on max bytes to buffer : 1"); | ||
} | ||
|
||
@Test | ||
@SuppressWarnings("unchecked") | ||
void testFluxReplyWithRequestAttribute() { | ||
ClientHttpConnector httpConnector = new HttpHandlerConnector((request, response) -> { | ||
response.setStatusCode(HttpStatus.OK); | ||
response.getHeaders().setContentType(MediaType.TEXT_PLAIN); | ||
|
||
DataBufferFactory bufferFactory = response.bufferFactory(); | ||
|
||
Mono<DataBuffer> data = Mono.just(bufferFactory.wrap("foo\nbar\nbaz".getBytes())); | ||
|
||
return response.writeWith(data) | ||
.then(Mono.defer(response::setComplete)); | ||
}); | ||
|
||
class AttributeFilter implements ExchangeFilterFunction { | ||
|
||
Optional<Object> attributeValueName; | ||
|
||
@Override | ||
public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) { | ||
this.attributeValueName = request.attribute("name"); | ||
return next.exchange(request); | ||
} | ||
} | ||
|
||
AttributeFilter attributeFilter = new AttributeFilter(); | ||
WebClient webClient = WebClient.builder() | ||
.clientConnector(httpConnector) | ||
.filter(attributeFilter) | ||
.build(); | ||
|
||
String destinationUri = "https://www.springsource.org/spring-integration"; | ||
WebFluxRequestExecutingMessageHandler reactiveHandler = | ||
new WebFluxRequestExecutingMessageHandler(destinationUri, webClient); | ||
|
||
QueueChannel replyChannel = new QueueChannel(); | ||
reactiveHandler.setOutputChannel(replyChannel); | ||
reactiveHandler.setExpectedResponseType(String.class); | ||
reactiveHandler.setReplyPayloadToFlux(true); | ||
Expression expr = new SpelExpressionParser().parseExpression("{name:{first:'Nikola'}}"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just for your knowledge: there is a |
||
reactiveHandler.setAttributeVariablesExpression(expr); | ||
reactiveHandler.setBeanFactory(mock(BeanFactory.class)); | ||
reactiveHandler.afterPropertiesSet(); | ||
|
||
reactiveHandler.handleMessage(MessageBuilder.withPayload(Mono.just("hello, world")).build()); | ||
|
||
Message<?> receive = replyChannel.receive(10_000); | ||
|
||
assertThat(attributeFilter.attributeValueName).isPresent(); | ||
|
||
Map<String, String> attributeValueNameMap = (Map<String, String>) attributeFilter.attributeValueName.get(); | ||
|
||
assertThat(attributeValueNameMap.get("first")).isEqualTo("Nikola"); | ||
|
||
assertThat(receive).isNotNull(); | ||
|
||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -319,3 +319,9 @@ See <<./http.adoc#http-outbound,HTTP Outbound Components>> for more possible con | |
|
||
Since WebFlux components are fully based on the HTTP protocol, there is no difference in the HTTP headers mapping. | ||
See <<./http.adoc#http-header-mapping,HTTP Header Mappings>> for more possible options and components to use for mapping headers. | ||
|
||
[[webflux-request-attributes]] | ||
=== WebFlux Request Attributes | ||
|
||
Starting with version 6.0, Webflux outbound request `WebFluxRequestExecutingMessageHandler` can have request attributes which can be specified in `setAttributeVariablesExpression()`. | ||
This will take an `expression` that should be evaluated in `Map`. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I still would like to see a general sentence for reasoning behind these attributes. My point is that you need to convince me somehow in using this new feature from the doc. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how about this? Starting with version 6.0, Webflux outbound request There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good! |
Uh oh!
There was an error while loading. Please reload this page.