Skip to content

Commit b8158df

Browse files
committed
Create new observation context for WebClient retries
Prior to this commit, the `DefaultWebClient` observability instrumentation would create the observation context before the reactive pipeline is fully materialized. In case of errors and retries (with the `retry(long)` operator), the observation context would be reused for separate observations, which is incorrect. This commit ensures that a new observation context is created for each subscription. Fixes gh-34671
1 parent c4e25a1 commit b8158df

File tree

2 files changed

+27
-10
lines changed

2 files changed

+27
-10
lines changed

Diff for: spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java

+16-10
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2024 the original author or authors.
2+
* Copyright 2002-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -449,20 +449,21 @@ public <V> Flux<V> exchangeToFlux(Function<ClientResponse, ? extends Flux<V>> re
449449
@Override
450450
public Mono<ClientResponse> exchange() {
451451
ClientRequest.Builder requestBuilder = initRequestBuilder();
452-
ClientRequestObservationContext observationContext = new ClientRequestObservationContext(requestBuilder);
453452
return Mono.deferContextual(contextView -> {
454453
Observation observation = ClientHttpObservationDocumentation.HTTP_REACTIVE_CLIENT_EXCHANGES.observation(observationConvention,
455-
DEFAULT_OBSERVATION_CONVENTION, () -> observationContext, observationRegistry);
454+
DEFAULT_OBSERVATION_CONVENTION, () -> new ClientRequestObservationContext(requestBuilder), observationRegistry);
456455
observation
457456
.parentObservation(contextView.getOrDefault(ObservationThreadLocalAccessor.KEY, null))
458457
.start();
459-
ExchangeFilterFunction filterFunction = new ObservationFilterFunction(observationContext);
458+
ExchangeFilterFunction filterFunction = new ObservationFilterFunction(observation.getContext());
460459
if (filterFunctions != null) {
461460
filterFunction = filterFunctions.andThen(filterFunction);
462461
}
463462
ClientRequest request = requestBuilder.build();
464-
observationContext.setUriTemplate((String) request.attribute(URI_TEMPLATE_ATTRIBUTE).orElse(null));
465-
observationContext.setRequest(request);
463+
if (observation.getContext() instanceof ClientRequestObservationContext observationContext) {
464+
observationContext.setUriTemplate((String) request.attribute(URI_TEMPLATE_ATTRIBUTE).orElse(null));
465+
observationContext.setRequest(request);
466+
}
466467
final ExchangeFilterFunction finalFilterFunction = filterFunction;
467468
Mono<ClientResponse> responseMono = Mono.defer(
468469
() -> finalFilterFunction.apply(exchangeFunction).exchange(request))
@@ -478,7 +479,8 @@ public Mono<ClientResponse> exchange() {
478479
.doOnNext(response -> responseReceived.set(true))
479480
.doOnError(observation::error)
480481
.doFinally(signalType -> {
481-
if (signalType == SignalType.CANCEL && !responseReceived.get()) {
482+
if (signalType == SignalType.CANCEL && !responseReceived.get() &&
483+
observation.getContext() instanceof ClientRequestObservationContext observationContext) {
482484
observationContext.setAborted(true);
483485
}
484486
observation.stop();
@@ -734,15 +736,19 @@ public Mono<? extends Throwable> apply(ClientResponse response) {
734736

735737
private static class ObservationFilterFunction implements ExchangeFilterFunction {
736738

737-
private final ClientRequestObservationContext observationContext;
739+
private final Observation.Context observationContext;
738740

739-
ObservationFilterFunction(ClientRequestObservationContext observationContext) {
741+
ObservationFilterFunction(Observation.Context observationContext) {
740742
this.observationContext = observationContext;
741743
}
742744

743745
@Override
744746
public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) {
745-
return next.exchange(request).doOnNext(this.observationContext::setResponse);
747+
Mono<ClientResponse> exchange = next.exchange(request);
748+
if (this.observationContext instanceof ClientRequestObservationContext clientContext) {
749+
exchange = exchange.doOnNext(clientContext::setResponse);
750+
}
751+
return exchange;
746752
}
747753
}
748754

Diff for: spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientObservationTests.java

+11
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ void setup() {
6969
when(mockResponse.statusCode()).thenReturn(HttpStatus.OK);
7070
when(mockResponse.headers()).thenReturn(new MockClientHeaders());
7171
when(mockResponse.bodyToMono(Void.class)).thenReturn(Mono.empty());
72+
when(mockResponse.bodyToMono(String.class)).thenReturn(Mono.error(IllegalStateException::new), Mono.just("Hello"));
7273
when(mockResponse.bodyToFlux(String.class)).thenReturn(Flux.just("first", "second"));
7374
when(mockResponse.releaseBody()).thenReturn(Mono.empty());
7475
when(this.exchangeFunction.exchange(this.request.capture())).thenReturn(Mono.just(mockResponse));
@@ -141,6 +142,16 @@ void recordsObservationForCancelledExchangeDuringResponse() {
141142
.hasLowCardinalityKeyValue("status", "200");
142143
}
143144

145+
@Test
146+
void recordsSingleObservationForRetries() {
147+
StepVerifier.create(this.builder.build().get().uri("/path").retrieve().bodyToMono(String.class).retry(1))
148+
.expectNextCount(1)
149+
.expectComplete()
150+
.verify(Duration.ofSeconds(2));
151+
assertThatHttpObservation().hasLowCardinalityKeyValue("outcome", "SUCCESS")
152+
.hasLowCardinalityKeyValue("status", "200");
153+
}
154+
144155
@Test
145156
void setsCurrentObservationInReactorContext() {
146157
ExchangeFilterFunction assertionFilter = (request, chain) -> chain.exchange(request).contextWrite(context -> {

0 commit comments

Comments
 (0)