Skip to content

Commit 776811b

Browse files
committed
Defer ExchangeFilterFunction execution in WebClient
Prior to this commit, the `DefaultWebClient` would execute the configured `ExchangeFilterFunction` as the reactive pipeline is assembled during subscription. This means that if imperative code is executed in a filter function, it won't be aware of the current observation through the local scope. For example, when automatic context propagation is enabled for Reactor operators, the logger MDC will not know about the current traceId/spanId. This commit ensures that client filter functions execution is deferred during the actual client exchange. Fixes gh-33559
1 parent de4ff4b commit 776811b

File tree

3 files changed

+25
-2
lines changed

3 files changed

+25
-2
lines changed

spring-webflux/spring-webflux.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ dependencies {
3737
testImplementation(testFixtures(project(":spring-web")))
3838
testImplementation("com.fasterxml:aalto-xml")
3939
testImplementation("com.squareup.okhttp3:mockwebserver")
40+
testImplementation("io.micrometer:context-propagation")
4041
testImplementation("io.micrometer:micrometer-observation-test")
4142
testImplementation("io.projectreactor:reactor-test")
4243
testImplementation("io.reactivex.rxjava3:rxjava")

spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -453,8 +453,9 @@ public Mono<ClientResponse> exchange() {
453453
ClientRequest request = requestBuilder.build();
454454
observationContext.setUriTemplate((String) request.attribute(URI_TEMPLATE_ATTRIBUTE).orElse(null));
455455
observationContext.setRequest(request);
456-
Mono<ClientResponse> responseMono = filterFunction.apply(exchangeFunction)
457-
.exchange(request)
456+
final ExchangeFilterFunction finalFilterFunction = filterFunction;
457+
Mono<ClientResponse> responseMono = Mono.defer(
458+
() -> finalFilterFunction.apply(exchangeFunction).exchange(request))
458459
.checkpoint("Request to " +
459460
WebClientUtils.getRequestDescription(request.method(), request.url()) +
460461
" [DefaultWebClient]")

spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientObservationTests.java

+21
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,12 @@
2727
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;
2828
import io.micrometer.observation.tck.TestObservationRegistry;
2929
import io.micrometer.observation.tck.TestObservationRegistryAssert;
30+
import org.junit.jupiter.api.AfterEach;
3031
import org.junit.jupiter.api.BeforeEach;
3132
import org.junit.jupiter.api.Test;
3233
import org.mockito.ArgumentCaptor;
3334
import reactor.core.publisher.Flux;
35+
import reactor.core.publisher.Hooks;
3436
import reactor.core.publisher.Mono;
3537
import reactor.test.StepVerifier;
3638

@@ -63,6 +65,7 @@ class WebClientObservationTests {
6365

6466
@BeforeEach
6567
void setup() {
68+
Hooks.enableAutomaticContextPropagation();
6669
ClientResponse mockResponse = mock();
6770
when(mockResponse.statusCode()).thenReturn(HttpStatus.OK);
6871
when(mockResponse.headers()).thenReturn(new MockClientHeaders());
@@ -74,6 +77,11 @@ void setup() {
7477
this.observationRegistry.observationConfig().observationHandler(new HeaderInjectingHandler());
7578
}
7679

80+
@AfterEach
81+
void cleanUp() {
82+
Hooks.disableAutomaticContextPropagation();
83+
}
84+
7785
@Test
7886
void recordsObservationForSuccessfulExchange() {
7987
this.builder.build().get().uri("/resource/{id}", 42)
@@ -148,6 +156,19 @@ void setsCurrentObservationInReactorContext() {
148156
verifyAndGetRequest();
149157
}
150158

159+
@Test
160+
void setsCurrentObservationInScope() {
161+
ExchangeFilterFunction assertionFilter = (request, chain) -> {
162+
Observation currentObservation = observationRegistry.getCurrentObservation();
163+
assertThat(currentObservation).isNotNull();
164+
assertThat(currentObservation.getContext()).isInstanceOf(ClientRequestObservationContext.class);
165+
return chain.exchange(request);
166+
};
167+
this.builder.filter(assertionFilter).build().get().uri("/resource/{id}", 42)
168+
.retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(5));
169+
verifyAndGetRequest();
170+
}
171+
151172
@Test
152173
void recordsObservationWithResponseDetailsWhenFilterFunctionErrors() {
153174
ExchangeFilterFunction errorFunction = (req, next) -> next.exchange(req).then(Mono.error(new IllegalStateException()));

0 commit comments

Comments
 (0)