Skip to content

Commit 5e4da62

Browse files
artembilanspring-builds
authored andcommitted
GH-9198: Fix MessageChannel observation for ErrorMessage
Fixes: #9198 When observation is enabled on the `MessageChannel`, the message to send is converted to a `MutableMessage`. In case of `ErrorMessage` this causes a loss of `originalMessage` and may lead to `ClassCastException` in the target error handler. * Check for `ErrorMessage` in the `AbstractMessageChannel.sendWithObservation()` and create a new one as a copy of original request, but including observation headers before performing `sendInternal()` * Modify `IntegrationObservabilityZipkinTests` to verify an observation with an `ErrorMessage` and its handler. (cherry picked from commit b0cbacf)
1 parent 6d25d21 commit 5e4da62

File tree

2 files changed

+45
-8
lines changed

2 files changed

+45
-8
lines changed

spring-integration-core/src/main/java/org/springframework/integration/channel/AbstractMessageChannel.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.concurrent.locks.Lock;
3131
import java.util.concurrent.locks.ReentrantLock;
3232

33+
import io.micrometer.observation.Observation;
3334
import io.micrometer.observation.ObservationRegistry;
3435

3536
import org.springframework.beans.factory.BeanFactory;
@@ -59,6 +60,7 @@
5960
import org.springframework.messaging.MessageDeliveryException;
6061
import org.springframework.messaging.converter.MessageConverter;
6162
import org.springframework.messaging.support.ChannelInterceptor;
63+
import org.springframework.messaging.support.ErrorMessage;
6264
import org.springframework.messaging.support.InterceptableChannel;
6365
import org.springframework.util.Assert;
6466
import org.springframework.util.StringUtils;
@@ -335,12 +337,22 @@ else if (this.metricsCaptor != null) {
335337

336338
private boolean sendWithObservation(Message<?> message, long timeout) {
337339
MutableMessage<?> messageToSend = MutableMessage.of(message);
338-
return IntegrationObservation.PRODUCER.observation(
339-
this.observationConvention,
340-
DefaultMessageSenderObservationConvention.INSTANCE,
341-
() -> new MessageSenderContext(messageToSend, getComponentName()),
342-
this.observationRegistry)
343-
.observe(() -> sendInternal(messageToSend, timeout)); // NOSONAR - never null
340+
Observation observation = IntegrationObservation.PRODUCER.observation(
341+
this.observationConvention,
342+
DefaultMessageSenderObservationConvention.INSTANCE,
343+
() -> new MessageSenderContext(messageToSend, getComponentName()),
344+
this.observationRegistry);
345+
Boolean observe = observation.observe(() -> {
346+
Message<?> messageToSendInternal = messageToSend;
347+
if (message instanceof ErrorMessage errorMessage) {
348+
messageToSendInternal =
349+
new ErrorMessage(errorMessage.getPayload(),
350+
messageToSend.getHeaders(),
351+
errorMessage.getOriginalMessage());
352+
}
353+
return sendInternal(messageToSendInternal, timeout);
354+
});
355+
return Boolean.TRUE.equals(observe);
344356
}
345357

346358
private boolean sendWithMetrics(Message<?> message, long timeout) {

spring-integration-core/src/test/java/org/springframework/integration/support/management/observation/IntegrationObservabilityZipkinTests.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@
4848
import org.springframework.integration.handler.advice.HandleMessageAdvice;
4949
import org.springframework.lang.Nullable;
5050
import org.springframework.messaging.Message;
51+
import org.springframework.messaging.MessageChannel;
5152
import org.springframework.messaging.PollableChannel;
53+
import org.springframework.messaging.support.ErrorMessage;
5254
import org.springframework.messaging.support.GenericMessage;
5355

5456
import static org.assertj.core.api.Assertions.assertThat;
@@ -102,6 +104,18 @@ public SampleTestRunnerConsumer yourCode() {
102104
assertThat(configuration.observedHandlerLatch.await(10, TimeUnit.SECONDS)).isTrue();
103105

104106
await().untilAsserted(() -> assertThat(configuration.observationReference.get()).isNotNull());
107+
108+
MessageChannel errorChannel = applicationContext.getBean("myErrorChannel", MessageChannel.class);
109+
ErrorMessage errorMessage =
110+
new ErrorMessage(new RuntimeException("some error"), new GenericMessage<>("some original"));
111+
errorChannel.send(errorMessage);
112+
113+
assertThat(configuration.errorMessageReceivedLatch.await(10, TimeUnit.SECONDS)).isTrue();
114+
115+
ErrorMessage receivedErrorMessage = configuration.errorMessageReference.get();
116+
assertThat(receivedErrorMessage.getOriginalMessage()).isEqualTo(errorMessage.getOriginalMessage());
117+
assertThat(receivedErrorMessage.getPayload()).isEqualTo(errorMessage.getPayload());
118+
assertThat(receivedErrorMessage.getHeaders()).containsKeys("X-B3-TraceId", "X-B3-SpanId");
105119
}
106120

107121
SpansAssert.assertThat(bb.getFinishedSpans())
@@ -119,7 +133,7 @@ public SampleTestRunnerConsumer yourCode() {
119133
.hasTag(IntegrationObservation.ProducerTags.COMPONENT_NAME.asString(), "queueChannel")
120134
.hasTag(IntegrationObservation.ProducerTags.COMPONENT_TYPE.asString(), "producer")
121135
.hasKindEqualTo(Span.Kind.PRODUCER))
122-
.hasSize(4);
136+
.hasSize(5);
123137

124138
MeterRegistryAssert.assertThat(getMeterRegistry())
125139
.hasTimerWithNameAndTags("spring.integration.handler",
@@ -134,7 +148,7 @@ public SampleTestRunnerConsumer yourCode() {
134148
@EnableIntegration
135149
@EnableIntegrationManagement(
136150
observationPatterns = {
137-
"${spring.integration.management.observation-patterns:testInboundGateway,skippedObservationInboundGateway,queueChannel,observedEndpoint,publishSubscribeChannel}",
151+
"${spring.integration.management.observation-patterns:testInboundGateway,skippedObservationInboundGateway,queueChannel,myErrorChannel,observedEndpoint,publishSubscribeChannel}",
138152
"${spring.integration.management.observation-patterns:}"
139153
})
140154
public static class ObservationIntegrationTestConfiguration {
@@ -181,6 +195,17 @@ BridgeHandler bridgeHandler(PublishSubscribeChannel publishSubscribeChannel) {
181195
return bridgeHandler;
182196
}
183197

198+
199+
AtomicReference<ErrorMessage> errorMessageReference = new AtomicReference<>();
200+
201+
CountDownLatch errorMessageReceivedLatch = new CountDownLatch(1);
202+
203+
@ServiceActivator(inputChannel = "myErrorChannel")
204+
void handleError(ErrorMessage errorMessage) {
205+
this.errorMessageReference.set(errorMessage);
206+
this.errorMessageReceivedLatch.countDown();
207+
}
208+
184209
@Bean
185210
HandleMessageAdvice observedHandlerAdvice() {
186211
return invocation -> {

0 commit comments

Comments
 (0)