Skip to content

Commit 145c845

Browse files
committed
Fix race condition in the IntObservZipkinTests
The `QueueChannel.receive()` may be fulfilled before an observation is stopped in the `MessageHandler` * Rework `IntegrationObservabilityZipkinTests` configuration to add a `HandleMessageAdvice` to wait on the `CountDownLatch` before asserting spans * Exclude an `adviceChain` attribute from the `ServiceActivatorAnnotationPostProcessor` since an advice can be applied for the consumer endpoint, not the MH directly. The `ConsumerEndpointFactoryBean` does the proper decision to apply advice onto MH or just around its `handleMessage()` method
1 parent 91a8b14 commit 145c845

File tree

2 files changed

+33
-3
lines changed

2 files changed

+33
-3
lines changed

spring-integration-core/src/main/java/org/springframework/integration/config/annotation/ServiceActivatorAnnotationPostProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
public class ServiceActivatorAnnotationPostProcessor extends AbstractMethodAnnotationPostProcessor<ServiceActivator> {
4747

4848
public ServiceActivatorAnnotationPostProcessor() {
49-
this.messageHandlerAttributes.addAll(Arrays.asList("outputChannel", "requiresReply", "adviceChain"));
49+
this.messageHandlerAttributes.addAll(Arrays.asList("outputChannel", "requiresReply"));
5050
}
5151

5252
@Override

spring-integration-core/src/test/java/org/springframework/integration/support/management/observation/IntegrationObservabilityZipkinTests.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,23 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020

21+
import java.util.concurrent.CountDownLatch;
22+
import java.util.concurrent.TimeUnit;
23+
2124
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
2225
import org.springframework.context.annotation.Bean;
2326
import org.springframework.context.annotation.Configuration;
2427
import org.springframework.integration.annotation.BridgeTo;
2528
import org.springframework.integration.annotation.EndpointId;
2629
import org.springframework.integration.annotation.Poller;
30+
import org.springframework.integration.annotation.ServiceActivator;
2731
import org.springframework.integration.channel.QueueChannel;
2832
import org.springframework.integration.channel.interceptor.ObservationPropagationChannelInterceptor;
2933
import org.springframework.integration.config.EnableIntegration;
3034
import org.springframework.integration.config.EnableIntegrationManagement;
3135
import org.springframework.integration.config.GlobalChannelInterceptor;
36+
import org.springframework.integration.handler.BridgeHandler;
37+
import org.springframework.integration.handler.advice.HandleMessageAdvice;
3238
import org.springframework.integration.support.MutableMessage;
3339
import org.springframework.integration.support.MutableMessageBuilder;
3440
import org.springframework.messaging.Message;
@@ -79,6 +85,9 @@ public SampleTestRunnerConsumer yourCode() {
7985
Message<?> receive = replyChannel.receive(10_000);
8086
assertThat(receive).isNotNull()
8187
.extracting("payload").isEqualTo("test data");
88+
var configuration = applicationContext.getBean(ObservationIntegrationTestConfiguration.class);
89+
90+
assertThat(configuration.observedHandlerLatch.await(10, TimeUnit.SECONDS)).isTrue();
8291
}
8392

8493
SpansAssert.assertThat(bb.getFinishedSpans())
@@ -105,19 +114,40 @@ public SampleTestRunnerConsumer yourCode() {
105114
@EnableIntegrationManagement
106115
public static class ObservationIntegrationTestConfiguration {
107116

117+
CountDownLatch observedHandlerLatch = new CountDownLatch(1);
118+
108119
@Bean
109120
@GlobalChannelInterceptor
110121
public ChannelInterceptor observationPropagationInterceptor(ObservationRegistry observationRegistry) {
111122
return new ObservationPropagationChannelInterceptor(observationRegistry);
112123
}
113124

114125
@Bean
115-
@BridgeTo(poller = @Poller(fixedDelay = "100"))
116-
@EndpointId("observedEndpoint")
117126
public PollableChannel queueChannel() {
118127
return new QueueChannel();
119128
}
120129

130+
@Bean
131+
@EndpointId("observedEndpoint")
132+
@ServiceActivator(inputChannel = "queueChannel",
133+
poller = @Poller(fixedDelay = "100"),
134+
adviceChain = "observedHandlerAdvice")
135+
BridgeHandler bridgeHandler() {
136+
return new BridgeHandler();
137+
}
138+
139+
@Bean
140+
HandleMessageAdvice observedHandlerAdvice() {
141+
return invocation -> {
142+
try {
143+
return invocation.proceed();
144+
}
145+
finally {
146+
this.observedHandlerLatch.countDown();
147+
}
148+
};
149+
}
150+
121151
}
122152

123153
}

0 commit comments

Comments
 (0)