Skip to content

Commit 4f5250b

Browse files
Propagate Reactor context over headers (#8591)
* Propagate Reactor context over headers When we do something like `Flux.from(Publisher)` and don't compose it with the one involved in the `Subscriber` context, we lose this context. * Provide a mechanism to propagate a Reactor context over message header produce within that context. * Restore this context in the `FluxMessageChannel` for a new publisher we use in this channel **Cherry-pick to `6.0.x`** * Fix language in docs Co-authored-by: Gary Russell <[email protected]> --------- Co-authored-by: Gary Russell <[email protected]>
1 parent 6dcdfa8 commit 4f5250b

File tree

6 files changed

+148
-24
lines changed

6 files changed

+148
-24
lines changed

spring-integration-core/src/main/java/org/springframework/integration/IntegrationMessageHeaderAccessor.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2013-2019 the original author or authors.
2+
* Copyright 2013-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,6 +25,8 @@
2525
import java.util.concurrent.atomic.AtomicInteger;
2626
import java.util.function.BiFunction;
2727

28+
import reactor.util.context.ContextView;
29+
2830
import org.springframework.integration.acks.AcknowledgmentCallback;
2931
import org.springframework.lang.Nullable;
3032
import org.springframework.messaging.Message;
@@ -77,6 +79,12 @@ public class IntegrationMessageHeaderAccessor extends MessageHeaderAccessor {
7779
*/
7880
public static final String SOURCE_DATA = "sourceData";
7981

82+
/**
83+
* Raw source message.
84+
*/
85+
public static final String REACTOR_CONTEXT = "reactorContext";
86+
87+
8088
private static final BiFunction<String, String, String> TYPE_VERIFY_MESSAGE_FUNCTION =
8189
(name, trailer) -> "The '" + name + trailer;
8290

@@ -175,6 +183,16 @@ public <T> T getSourceData() {
175183
return (T) getHeader(SOURCE_DATA);
176184
}
177185

186+
/**
187+
* Get a {@link ContextView} header if present.
188+
* @return the {@link ContextView} header if present.
189+
* @since 6.0.5
190+
*/
191+
@Nullable
192+
public ContextView getReactorContext() {
193+
return getHeader(REACTOR_CONTEXT, ContextView.class);
194+
}
195+
178196
@SuppressWarnings("unchecked")
179197
@Nullable
180198
public <T> T getHeader(String key, Class<T> type) {

spring-integration-core/src/main/java/org/springframework/integration/StaticMessageHeaderAccessor.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2020 the original author or authors.
2+
* Copyright 2017-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,9 @@
2020
import java.util.UUID;
2121
import java.util.concurrent.atomic.AtomicInteger;
2222

23+
import reactor.util.context.Context;
24+
import reactor.util.context.ContextView;
25+
2326
import org.springframework.integration.acks.AcknowledgmentCallback;
2427
import org.springframework.integration.acks.SimpleAcknowledgment;
2528
import org.springframework.lang.Nullable;
@@ -120,4 +123,19 @@ public static <T> T getSourceData(Message<?> message) {
120123
return (T) message.getHeaders().get(IntegrationMessageHeaderAccessor.SOURCE_DATA);
121124
}
122125

126+
/**
127+
* Get a {@link ContextView} header if present.
128+
* @param message the message to get a header from.
129+
* @return the {@link ContextView} header if present.
130+
* @since 6.0.5
131+
*/
132+
public static ContextView getReactorContext(Message<?> message) {
133+
ContextView reactorContext = message.getHeaders()
134+
.get(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, ContextView.class);
135+
if (reactorContext == null) {
136+
reactorContext = Context.empty();
137+
}
138+
return reactorContext;
139+
}
140+
123141
}

spring-integration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@
3030
import reactor.core.scheduler.Scheduler;
3131
import reactor.core.scheduler.Schedulers;
3232

33+
import org.springframework.core.log.LogMessage;
34+
import org.springframework.integration.IntegrationMessageHeaderAccessor;
35+
import org.springframework.integration.StaticMessageHeaderAccessor;
36+
import org.springframework.integration.support.MessageBuilder;
3337
import org.springframework.messaging.Message;
3438
import org.springframework.messaging.MessageDeliveryException;
3539
import org.springframework.util.Assert;
@@ -111,22 +115,35 @@ public void subscribeTo(Publisher<? extends Message<?>> publisher) {
111115
Flux.from(publisher)
112116
.delaySubscription(this.subscribedSignal.asFlux().filter(Boolean::booleanValue).next())
113117
.publishOn(this.scheduler)
114-
.handle((message, synchronousSink) -> {
115-
try {
116-
if (!send(message)) {
117-
logger.warn(new MessageDeliveryException(message,
118-
"Failed to send message to channel '" + this),
119-
"Message was not delivered");
120-
}
121-
}
122-
catch (Exception ex) {
123-
logger.warn(ex, () -> "Error during processing event: " + message);
124-
}
125-
})
118+
.flatMap((message) ->
119+
Mono.just(message)
120+
.handle((messageToHandle, sink) -> sendReactiveMessage(messageToHandle))
121+
.contextWrite(StaticMessageHeaderAccessor.getReactorContext(message)))
126122
.contextCapture()
127123
.subscribe());
128124
}
129125

126+
private void sendReactiveMessage(Message<?> message) {
127+
Message<?> messageToSend = message;
128+
// We have just restored Reactor context, so no need in a header anymore.
129+
if (messageToSend.getHeaders().containsKey(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT)) {
130+
messageToSend =
131+
MessageBuilder.fromMessage(message)
132+
.removeHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT)
133+
.build();
134+
}
135+
try {
136+
if (!send(messageToSend)) {
137+
logger.warn(
138+
new MessageDeliveryException(messageToSend, "Failed to send message to channel '" + this),
139+
"Message was not delivered");
140+
}
141+
}
142+
catch (Exception ex) {
143+
logger.warn(ex, LogMessage.format("Error during processing event: %s", messageToSend));
144+
}
145+
}
146+
130147
@Override
131148
public void destroy() {
132149
this.active = false;

spring-integration-rsocket/src/main/java/org/springframework/integration/rsocket/inbound/RSocketInboundGateway.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,15 @@
2222
import org.reactivestreams.Publisher;
2323
import reactor.core.publisher.Flux;
2424
import reactor.core.publisher.Mono;
25+
import reactor.util.context.ContextView;
2526

2627
import org.springframework.core.ReactiveAdapter;
2728
import org.springframework.core.ResolvableType;
2829
import org.springframework.core.codec.Decoder;
2930
import org.springframework.core.codec.Encoder;
3031
import org.springframework.core.io.buffer.DataBuffer;
3132
import org.springframework.core.io.buffer.DataBufferFactory;
33+
import org.springframework.integration.IntegrationMessageHeaderAccessor;
3234
import org.springframework.integration.gateway.MessagingGatewaySupport;
3335
import org.springframework.integration.rsocket.AbstractRSocketConnector;
3436
import org.springframework.integration.rsocket.ClientRSocketConnector;
@@ -151,7 +153,7 @@ public void setRequestElementClass(Class<?> requestElementClass) {
151153
/**
152154
* Specify the type of payload to be generated when the inbound RSocket request
153155
* content is read by the converters/encoders.
154-
* By default this value is null which means at runtime any "text" Content-Type will
156+
* By default, this value is null which means at runtime any "text" Content-Type will
155157
* result in String while all others default to {@code byte[].class}.
156158
* @param requestElementType The payload type.
157159
*/
@@ -212,9 +214,22 @@ public Mono<Void> handleMessage(Message<?> requestMessage) {
212214
}
213215
else {
214216
return requestMono
215-
.doOnNext(this::send)
216-
.then();
217+
.flatMap((message) ->
218+
Mono.deferContextual((context) ->
219+
Mono.just(message)
220+
.handle((messageToSend, sink) ->
221+
send(messageWithReactorContextIfAny(messageToSend, context)))));
222+
}
223+
}
224+
225+
private Message<?> messageWithReactorContextIfAny(Message<?> message, ContextView context) {
226+
if (!context.isEmpty()) {
227+
return getMessageBuilderFactory()
228+
.fromMessage(message)
229+
.setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, context)
230+
.build();
217231
}
232+
return message;
218233
}
219234

220235
private Mono<Message<?>> decodeRequestMessage(Message<?> requestMessage) {

spring-integration-webflux/src/main/java/org/springframework/integration/webflux/inbound/WebFluxInboundEndpoint.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.reactivestreams.Publisher;
3131
import reactor.core.publisher.Flux;
3232
import reactor.core.publisher.Mono;
33+
import reactor.util.context.ContextView;
3334
import reactor.util.function.Tuple2;
3435

3536
import org.springframework.core.ReactiveAdapter;
@@ -47,6 +48,7 @@
4748
import org.springframework.http.codec.ServerCodecConfigurer;
4849
import org.springframework.http.server.reactive.ServerHttpRequest;
4950
import org.springframework.http.server.reactive.ServerHttpResponse;
51+
import org.springframework.integration.IntegrationMessageHeaderAccessor;
5052
import org.springframework.integration.expression.ExpressionEvalMap;
5153
import org.springframework.integration.http.HttpHeaders;
5254
import org.springframework.integration.http.inbound.BaseHttpInboundEndpoint;
@@ -99,7 +101,7 @@ public WebFluxInboundEndpoint(boolean expectReply) {
99101

100102
/**
101103
* A {@link ServerCodecConfigurer} for the request readers and response writers.
102-
* By default the {@link ServerCodecConfigurer#create()} factory is used.
104+
* By default, the {@link ServerCodecConfigurer#create()} factory is used.
103105
* @param codecConfigurer the {@link ServerCodecConfigurer} to use.
104106
*/
105107
public void setCodecConfigurer(ServerCodecConfigurer codecConfigurer) {
@@ -133,9 +135,9 @@ public String getComponentType() {
133135

134136
@Override
135137
public Mono<Void> handle(ServerWebExchange exchange) {
136-
return Mono.defer(() -> {
138+
return Mono.deferContextual((context) -> {
137139
if (isRunning()) {
138-
return doHandle(exchange);
140+
return doHandle(exchange, context);
139141
}
140142
else {
141143
return Mono.error(new ResponseStatusException(HttpStatus.SERVICE_UNAVAILABLE, "Endpoint is stopped"))
@@ -144,13 +146,13 @@ public Mono<Void> handle(ServerWebExchange exchange) {
144146
});
145147
}
146148

147-
private Mono<Void> doHandle(ServerWebExchange exchange) {
149+
private Mono<Void> doHandle(ServerWebExchange exchange, ContextView context) {
148150
return extractRequestBody(exchange)
149151
.doOnSubscribe(s -> this.activeCount.incrementAndGet())
150152
.map(body ->
151153
new RequestEntity<>(body, exchange.getRequest().getHeaders(),
152154
exchange.getRequest().getMethod(), exchange.getRequest().getURI()))
153-
.flatMap(entity -> buildMessage(entity, exchange))
155+
.flatMap(entity -> buildMessage(entity, exchange, context))
154156
.flatMap(requestTuple -> {
155157
if (isExpectReply()) {
156158
return sendAndReceiveMessageReactive(requestTuple.getT1())
@@ -253,7 +255,7 @@ private Mono<?> readRequestBody(ServerWebExchange exchange, MediaType contentTyp
253255
}
254256

255257
private Mono<Tuple2<Message<Object>, RequestEntity<?>>> buildMessage(RequestEntity<?> httpEntity,
256-
ServerWebExchange exchange) {
258+
ServerWebExchange exchange, ContextView context) {
257259

258260
ServerHttpRequest request = exchange.getRequest();
259261
MultiValueMap<String, String> requestParams = request.getQueryParams();
@@ -283,6 +285,10 @@ private Mono<Tuple2<Message<Object>, RequestEntity<?>>> buildMessage(RequestEnti
283285
AbstractIntegrationMessageBuilder<Object> messageBuilder =
284286
prepareRequestMessageBuilder(request, payload, headers);
285287

288+
if (!context.isEmpty()) {
289+
messageBuilder.setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, context);
290+
}
291+
286292
return exchange.getPrincipal()
287293
.map(principal -> messageBuilder.setHeader(HttpHeaders.USER_PRINCIPAL, principal))
288294
.defaultIfEmpty(messageBuilder)

src/reference/asciidoc/reactive-streams.adoc

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ The subscription for this `Mono` is done using `Schedulers.boundedElastic()` to
105105
When the message source returns `null` (no data to pull), the `Mono` is turned into a `repeatWhenEmpty()` state with a `delay` for a subsequent re-subscription based on a `IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY` `Duration` entry from the subscriber context.
106106
By default, it is 1 second.
107107
If the `MessageSource` produces messages with a `IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK` information in the headers, it is acknowledged (if necessary) in the `doOnSuccess()` of the original `Mono` and rejected in the `doOnError()` if the downstream flow throws a `MessagingException` with the failed message to reject.
108-
This `ReactiveMessageSourceProducer` could be used for any use-case when a a polling channel adapter's features should be turned into a reactive, on demand solution for any existing `MessageSource<?>` implementation.
108+
This `ReactiveMessageSourceProducer` could be used for any use-case when a polling channel adapter's features should be turned into a reactive, on demand solution for any existing `MessageSource<?>` implementation.
109109

110110
=== Splitter and Aggregator
111111

@@ -332,3 +332,53 @@ Currently, Spring Integration provides channel adapter (or gateway) implementati
332332
The <<./redis.adoc#redis-stream-outbound,Redis Stream Channel Adapters>> are also reactive and uses `ReactiveStreamOperations` from Spring Data.
333333
More reactive channel adapters are coming, for example for Apache Kafka in <<./kafka.adoc#kafka,Kafka>> based on the `ReactiveKafkaProducerTemplate` and `ReactiveKafkaConsumerTemplate` from https://spring.io/projects/spring-kafka[Spring for Apache Kafka] etc.
334334
For many other non-reactive channel adapters thread pools are recommended to avoid blocking during reactive stream processing.
335+
336+
[[context-propagation]]
337+
=== Reactive to Imperative Context Propagation
338+
339+
When the https://github.com/micrometer-metrics/context-propagation[Context Propagation] library is on the classpath, the Project Reactor can take `ThreadLocal` values (e.g. https://micrometer.io/docs/observation[Micrometer Observation] or `SecurityContextHolder`) and store them into a `Subscriber` context.
340+
The opposite operation is also possible, when we need to populate a logging MDC for tracing or let services we call from the reactive stream to restore an observation from the scope.
341+
See more information in Project Reactor https://projectreactor.io/docs/core/release/reference/#context.propagation[documentation] about its special operators for context propagation.
342+
The storing and restoring context works smoothly if our whole solution is a single reactive stream composition since a `Subscriber` context is visible from downstream up to the beginning of the composition(`Flux` or `Mono`).
343+
But, if the application switches between different `Flux` instances or into imperative processing and back, then the context tied to the `Subscriber` might not be available.
344+
For such a use case, Spring Integration provides an additional capability (starting with version `6.0.5`) to store a Reactor `ContextView` into the `IntegrationMessageHeaderAccessor.REACTOR_CONTEXT` message header produced from the reactive stream, e.g. when we perform direct `send()` operation.
345+
This header is used then in the `FluxMessageChannel.subscribeTo()` to restore a Reactor context for the `Message` that this channel is going to emit.
346+
Currently, this header is populated from the `WebFluxInboundEndpoint` and `RSocketInboundGateway` components, but can be used in any solution where reactive to imperative integration is performed.
347+
The logic to populate this header is like this:
348+
349+
====
350+
[source, java]
351+
----
352+
return requestMono
353+
.flatMap((message) ->
354+
Mono.deferContextual((context) ->
355+
Mono.just(message)
356+
.handle((messageToSend, sink) ->
357+
send(messageWithReactorContextIfAny(messageToSend, context)))));
358+
...
359+
360+
private Message<?> messageWithReactorContextIfAny(Message<?> message, ContextView context) {
361+
if (!context.isEmpty()) {
362+
return getMessageBuilderFactory()
363+
.fromMessage(message)
364+
.setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, context)
365+
.build();
366+
}
367+
return message;
368+
}
369+
----
370+
====
371+
372+
Note, that we still need to use a `handle()` operator to make Reactor restore `ThreadLocal` values from the context.
373+
Even if it is sent as a header, the framework cannot make an assumption if it is going to be to restore onto `ThreadLocal` values downstream.
374+
375+
To restore the context from a `Message` on the other `Flux` or `Mono` composition, this logic can be performed:
376+
377+
====
378+
[source, java]
379+
----
380+
Mono.just(message)
381+
.handle((messageToHandle, sink) -> ...)
382+
.contextWrite(StaticMessageHeaderAccessor.getReactorContext(message)));
383+
----
384+
====

0 commit comments

Comments
 (0)