diff --git a/spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java b/spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java index 021b81e714c..5838ad7f9fa 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2023 the original author or authors. + * Copyright 2014-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,6 +30,7 @@ import java.util.function.BiConsumer; import org.reactivestreams.Publisher; +import reactor.core.Exceptions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -47,6 +48,7 @@ import org.springframework.integration.routingslip.RoutingSlipRouteStrategy; import org.springframework.integration.support.AbstractIntegrationMessageBuilder; import org.springframework.integration.support.utils.IntegrationUtils; +import org.springframework.integration.util.IntegrationReactiveUtils; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; @@ -366,6 +368,7 @@ private static Publisher toPublisherReply(Object reply, @Nullable ReactiveAda } } + @SuppressWarnings("try") private static CompletableFuture toFutureReply(Object reply, @Nullable ReactiveAdapter reactiveAdapter) { if (reactiveAdapter != null) { Mono reactiveReply; @@ -377,7 +380,31 @@ private static CompletableFuture toFutureReply(Object reply, @Nullable Reacti reactiveReply = Mono.from(publisher); } - return reactiveReply.publishOn(Schedulers.boundedElastic()).toFuture(); + CompletableFuture replyFuture = new CompletableFuture<>(); + + reactiveReply + .publishOn(Schedulers.boundedElastic()) + // TODO until Reactor supports context propagation from the MonoToCompletableFuture + .doOnEach((signal) -> { + try (AutoCloseable scope = IntegrationReactiveUtils + .setThreadLocalsFromReactorContext(signal.getContextView())) { + + if (signal.isOnError()) { + replyFuture.completeExceptionally(signal.getThrowable()); + } + else { + replyFuture.complete(signal.get()); + } + + } + catch (Exception ex) { + throw Exceptions.bubble(ex); + } + }) + .contextCapture() + .subscribe(); + + return replyFuture; } else { return toCompletableFuture(reply); diff --git a/spring-integration-webflux/src/test/java/org/springframework/integration/webflux/observation/WebFluxObservationPropagationTests.java b/spring-integration-webflux/src/test/java/org/springframework/integration/webflux/observation/WebFluxObservationPropagationTests.java index 4050d0a8521..92a985c5f73 100644 --- a/spring-integration-webflux/src/test/java/org/springframework/integration/webflux/observation/WebFluxObservationPropagationTests.java +++ b/spring-integration-webflux/src/test/java/org/springframework/integration/webflux/observation/WebFluxObservationPropagationTests.java @@ -36,26 +36,33 @@ import io.micrometer.tracing.test.simple.SpansAssert; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import reactor.core.publisher.Mono; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.core.io.buffer.DataBuffer; import org.springframework.http.HttpMethod; +import org.springframework.http.HttpStatus; +import org.springframework.http.client.reactive.ClientHttpConnector; import org.springframework.http.server.reactive.HttpHandler; import org.springframework.integration.channel.FluxMessageChannel; import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.config.EnableIntegrationManagement; +import org.springframework.integration.core.MessagingTemplate; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.webflux.dsl.WebFlux; import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; import org.springframework.messaging.PollableChannel; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.web.SpringJUnitWebConfig; import org.springframework.test.web.reactive.server.HttpHandlerConnector; import org.springframework.test.web.reactive.server.WebTestClient; import org.springframework.web.reactive.config.EnableWebFlux; +import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.server.adapter.WebHttpHandlerBuilder; import static org.assertj.core.api.Assertions.assertThat; @@ -81,6 +88,10 @@ public class WebFluxObservationPropagationTests { @Autowired private PollableChannel testChannel; + @Autowired + @Qualifier("webFluxRequestReplyClientFlow.input") + private MessageChannel webFluxRequestReplyClientFlowInput; + @BeforeEach void setup() { SPANS.clear(); @@ -122,6 +133,20 @@ void observationIsPropagatedWebFluxRequestReply() { .haveSameTraceId(); } + @Test + void observationIsPropagatedWebFluxClientRequestReply() { + String result = + new MessagingTemplate() + .convertSendAndReceive(this.webFluxRequestReplyClientFlowInput, "test", String.class); + + assertThat(result).isEqualTo("SOME REPLY"); + + // There is a race condition when we already have a reply, but the span in the last channel is not closed yet. + await().untilAsserted(() -> assertThat(SPANS.spans()).hasSize(5)); + SpansAssert.assertThat(SPANS.spans().stream().map(BraveFinishedSpan::fromBrave).collect(Collectors.toList())) + .haveSameTraceId(); + } + @Configuration @EnableWebFlux @EnableIntegration @@ -188,6 +213,29 @@ FluxMessageChannel webFluxRequestChannel() { return new FluxMessageChannel(); } + @Bean + IntegrationFlow webFluxRequestReplyClientFlow(ObservationRegistry registry) { + ClientHttpConnector httpConnector = + new HttpHandlerConnector((request, response) -> { + response.setStatusCode(HttpStatus.OK); + + Mono replyData = Mono.just(response.bufferFactory().wrap("some reply".getBytes())); + + return response.writeWith(replyData) + .then(Mono.defer(response::setComplete)); + }); + WebClient webClient = + WebClient.builder() + .clientConnector(httpConnector) + .observationRegistry(registry) + .build(); + + return f -> f + .handle(WebFlux.outboundGateway(message -> "/someRequest", webClient) + .expectedResponseType(String.class)) + .transform(String::toUpperCase); + } + @Bean IntegrationFlow webFluxRequestReplyFlow( @Qualifier("webFluxRequestChannel") FluxMessageChannel webFluxRequestChannel) {