Skip to content

Commit 8c73a2d

Browse files
authored
Add some infrastructure for Observation (#3879)
* Add some infrastructure for Observation * Populate an `ObservationRegistry` bean from the `IntegrationManagementConfigurer` into all the `IntegrationManagement` components * Introduce `MessageReceiverContext` and `MessageSenderContext` for easier usage in the target code * Implement `Observation` handling in the `AbstractMessageHandler` * Modify `ObservationPropagationChannelInterceptorTests` for new `MessageSenderContext` * Use `BridgeHandler` to ensure that `Observation` is propagated and handled properly * Verify that tags from the `AbstractMessageHandler` are preset on the consumer span * * Add a `DocumentedObservation` infrastructure * * Add `Timer` verification to the propagation test * * Update to the latest Observation API * * Add custom observation convention support for the `AbstractMessageHandler` * Use more meaningful prefix for Spring Integration tags * * Move singleton instance for `DefaultMessageReceiverObservationConvention` into `DefaultMessageReceiverObservationConvention` per se as an `INSTANCE` constant * Use `MeterRegistryAssert` in the `ObservationPropagationChannelInterceptorTests` to verify meters emitted * * And an integration test with Zipkin based on the `SampleTestRunner`
1 parent 5ece0e0 commit 8c73a2d

File tree

13 files changed

+596
-74
lines changed

13 files changed

+596
-74
lines changed

build.gradle

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -528,7 +528,12 @@ project('spring-integration-core') {
528528
testImplementation "org.aspectj:aspectjweaver:$aspectjVersion"
529529
testImplementation "org.hamcrest:hamcrest-core:$hamcrestVersion"
530530
testImplementation 'io.micrometer:micrometer-observation-test'
531-
testImplementation 'io.micrometer:micrometer-tracing-test'
531+
testImplementation ('io.micrometer:micrometer-tracing-integration-test') {
532+
exclude group: 'io.opentelemetry'
533+
exclude group: 'com.wavefront'
534+
exclude group: 'io.micrometer', module: 'micrometer-tracing-bridge-otel'
535+
536+
}
532537
}
533538

534539
dokkaHtmlPartial {

spring-integration-core/src/main/java/org/springframework/integration/config/IntegrationManagementConfiguration.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2020 the original author or authors.
2+
* Copyright 2015-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -31,6 +31,8 @@
3131
import org.springframework.integration.support.management.metrics.MetricsCaptor;
3232
import org.springframework.util.Assert;
3333

34+
import io.micrometer.observation.ObservationRegistry;
35+
3436
/**
3537
* {@code @Configuration} class that registers a {@link IntegrationManagementConfigurer} bean.
3638
*
@@ -64,12 +66,16 @@ public void setImportMetadata(AnnotationMetadata importMetadata) {
6466

6567
@Bean(name = IntegrationManagementConfigurer.MANAGEMENT_CONFIGURER_NAME)
6668
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
67-
public IntegrationManagementConfigurer managementConfigurer(ObjectProvider<MetricsCaptor> metricsCaptorProvider) {
69+
public IntegrationManagementConfigurer managementConfigurer(
70+
ObjectProvider<MetricsCaptor> metricsCaptorProvider,
71+
ObjectProvider<ObservationRegistry> observationRegistryProvider) {
72+
6873
IntegrationManagementConfigurer configurer = new IntegrationManagementConfigurer();
6974
configurer.setDefaultLoggingEnabled(
7075
Boolean.parseBoolean(this.environment.resolvePlaceholders(
7176
(String) this.attributes.get("defaultLoggingEnabled"))));
7277
configurer.setMetricsCaptorProvider(metricsCaptorProvider);
78+
configurer.setObservationRegistry(observationRegistryProvider);
7379
return configurer;
7480
}
7581

spring-integration-core/src/main/java/org/springframework/integration/config/IntegrationManagementConfigurer.java

Lines changed: 48 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2020 the original author or authors.
2+
* Copyright 2015-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -38,10 +38,12 @@
3838
import org.springframework.messaging.MessageHandler;
3939
import org.springframework.util.Assert;
4040

41+
import io.micrometer.observation.ObservationRegistry;
42+
4143

4244
/**
4345
* Configures beans that implement {@link IntegrationManagement}.
44-
* Configures counts, stats, logging for all (or selected) components.
46+
* Configures logging, {@link MetricsCaptor} and {@link ObservationRegistry} for all (or selected) components.
4547
*
4648
* @author Gary Russell
4749
* @author Artem Bilan
@@ -74,6 +76,10 @@ public class IntegrationManagementConfigurer
7476

7577
private ObjectProvider<MetricsCaptor> metricsCaptorProvider;
7678

79+
private ObservationRegistry observationRegistry;
80+
81+
private ObjectProvider<ObservationRegistry> observationRegistryProvider;
82+
7783
@Override
7884
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
7985
this.applicationContext = applicationContext;
@@ -95,8 +101,8 @@ public void setBeanName(String name) {
95101
* {@link org.apache.commons.logging.Log#isDebugEnabled()} can be quite expensive
96102
* and account for an inordinate amount of CPU time.
97103
* <p>
98-
* Set this to false to disable logging by default in all framework components that implement
99-
* {@link IntegrationManagement} (channels, message handlers etc). This turns off logging such as
104+
* Set this to 'false' to disable logging by default in all framework components that implement
105+
* {@link IntegrationManagement} (channels, message handlers etc.) This turns off logging such as
100106
* "PreSend on channel", "Received message" etc.
101107
* <p>
102108
* After the context is initialized, individual components can have their setting changed by invoking
@@ -115,14 +121,21 @@ void setMetricsCaptorProvider(ObjectProvider<MetricsCaptor> metricsCaptorProvide
115121
this.metricsCaptorProvider = metricsCaptorProvider;
116122
}
117123

118-
@Nullable
119-
MetricsCaptor obtainMetricsCaptor() {
120-
if (this.metricsCaptor == null && this.metricsCaptorProvider != null) {
121-
this.metricsCaptor = this.metricsCaptorProvider.getIfUnique();
122-
}
123-
return this.metricsCaptor;
124+
/**
125+
* Set an {@link ObservationRegistry} to populate to the {@link IntegrationManagement} components
126+
* in the application context.
127+
* @param observationRegistry the {@link ObservationRegistry} to use.
128+
* @since 6.0
129+
*/
130+
public void setObservationRegistry(@Nullable ObservationRegistry observationRegistry) {
131+
this.observationRegistry = observationRegistry;
132+
}
133+
134+
void setObservationRegistry(ObjectProvider<ObservationRegistry> observationRegistryProvider) {
135+
this.observationRegistryProvider = observationRegistryProvider;
124136
}
125137

138+
126139
@Override
127140
public void afterSingletonsInstantiated() {
128141
Assert.state(this.applicationContext != null, "'applicationContext' must not be null");
@@ -133,15 +146,29 @@ public void afterSingletonsInstantiated() {
133146
registerComponentGauges();
134147
}
135148

136-
for (IntegrationManagement integrationManagement :
137-
this.applicationContext.getBeansOfType(IntegrationManagement.class).values()) {
149+
setupObservationRegistry();
138150

139-
enhanceIntegrationManagement(integrationManagement);
140-
}
151+
this.applicationContext.getBeansOfType(IntegrationManagement.class).values()
152+
.forEach(this::enhanceIntegrationManagement);
141153

142154
this.singletonsInstantiated = true;
143155
}
144156

157+
@Nullable
158+
private MetricsCaptor obtainMetricsCaptor() {
159+
if (this.metricsCaptor == null && this.metricsCaptorProvider != null) {
160+
this.metricsCaptor = this.metricsCaptorProvider.getIfUnique();
161+
}
162+
return this.metricsCaptor;
163+
}
164+
165+
@Nullable
166+
private void setupObservationRegistry() {
167+
if (this.observationRegistry == null && this.observationRegistryProvider != null) {
168+
this.observationRegistry = this.observationRegistryProvider.getIfUnique();
169+
}
170+
}
171+
145172
private void registerComponentGauges() {
146173
this.gauges.add(
147174
this.metricsCaptor.gaugeBuilder("spring.integration.channels", this,
@@ -169,17 +196,21 @@ private void enhanceIntegrationManagement(IntegrationManagement integrationManag
169196
if (this.metricsCaptor != null) {
170197
integrationManagement.registerMetricsCaptor(this.metricsCaptor);
171198
}
199+
if (this.observationRegistry != null) {
200+
integrationManagement.registerObservationRegistry(this.observationRegistry);
201+
}
172202
}
173203

174204
@Override
175205
public Object postProcessAfterInitialization(Object bean, String name) throws BeansException {
176-
if (this.singletonsInstantiated && bean instanceof IntegrationManagement) {
177-
enhanceIntegrationManagement((IntegrationManagement) bean);
206+
if (this.singletonsInstantiated && bean instanceof IntegrationManagement integrationManagement) {
207+
enhanceIntegrationManagement(integrationManagement);
178208
}
179209
return bean;
180210
}
181211

182-
@Override public void onApplicationEvent(ContextClosedEvent event) {
212+
@Override
213+
public void onApplicationEvent(ContextClosedEvent event) {
183214
if (event.getApplicationContext().equals(this.applicationContext)) {
184215
this.gauges.forEach(MeterFacade::remove);
185216
this.gauges.clear();

spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageHandler.java

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,17 @@
2121
import org.springframework.integration.history.MessageHistory;
2222
import org.springframework.integration.support.management.metrics.MetricsCaptor;
2323
import org.springframework.integration.support.management.metrics.SampleFacade;
24+
import org.springframework.integration.support.management.observation.DefaultMessageReceiverObservationConvention;
25+
import org.springframework.integration.support.management.observation.IntegrationObservation;
26+
import org.springframework.integration.support.management.observation.MessageReceiverContext;
27+
import org.springframework.integration.support.management.observation.MessageReceiverObservationConvention;
2428
import org.springframework.integration.support.utils.IntegrationUtils;
29+
import org.springframework.lang.Nullable;
2530
import org.springframework.messaging.Message;
2631
import org.springframework.messaging.MessageHandler;
2732
import org.springframework.util.Assert;
2833

34+
import io.micrometer.observation.ObservationRegistry;
2935
import reactor.core.CoreSubscriber;
3036

3137
/**
@@ -37,21 +43,49 @@
3743
public abstract class AbstractMessageHandler extends MessageHandlerSupport
3844
implements MessageHandler, CoreSubscriber<Message<?>> {
3945

46+
@Nullable
47+
private MessageReceiverObservationConvention observationConvention;
48+
49+
/**
50+
* Set a custom {@link MessageReceiverObservationConvention} for {@link IntegrationObservation#HANDLER}.
51+
* Ignored if an {@link ObservationRegistry} is not configured for this component.
52+
* @param observationConvention the {@link MessageReceiverObservationConvention} to use.
53+
* @since 6.0
54+
*/
55+
public void setObservationConvention(@Nullable MessageReceiverObservationConvention observationConvention) {
56+
this.observationConvention = observationConvention;
57+
}
58+
4059
@Override // NOSONAR
4160
public void handleMessage(Message<?> message) {
4261
Assert.notNull(message, "Message must not be null");
43-
if (isLoggingEnabled() && this.logger.isDebugEnabled()) {
44-
this.logger.debug(this + " received message: " + message);
62+
if (isLoggingEnabled()) {
63+
this.logger.debug(() -> this + " received message: " + message);
4564
}
46-
MetricsCaptor metricsCaptor = getMetricsCaptor();
47-
if (metricsCaptor != null) {
48-
handleWithMetrics(message, metricsCaptor);
65+
ObservationRegistry observationRegistry = getObservationRegistry();
66+
if (observationRegistry != null) {
67+
handleWithObservation(message, observationRegistry);
4968
}
5069
else {
51-
doHandleMessage(message);
70+
MetricsCaptor metricsCaptor = getMetricsCaptor();
71+
if (metricsCaptor != null) {
72+
handleWithMetrics(message, metricsCaptor);
73+
}
74+
else {
75+
doHandleMessage(message);
76+
}
5277
}
5378
}
5479

80+
private void handleWithObservation(Message<?> message, ObservationRegistry observationRegistry) {
81+
IntegrationObservation.HANDLER.observation(
82+
this.observationConvention,
83+
DefaultMessageReceiverObservationConvention.INSTANCE,
84+
new MessageReceiverContext(message, getComponentName()),
85+
observationRegistry)
86+
.observe(() -> doHandleMessage(message));
87+
}
88+
5589
private void handleWithMetrics(Message<?> message, MetricsCaptor metricsCaptor) {
5690
SampleFacade sample = metricsCaptor.start();
5791
try {

spring-integration-core/src/main/java/org/springframework/integration/handler/MessageHandlerSupport.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2020 the original author or authors.
2+
* Copyright 2019-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -31,6 +31,8 @@
3131
import org.springframework.integration.support.management.metrics.MetricsCaptor;
3232
import org.springframework.integration.support.management.metrics.TimerFacade;
3333

34+
import io.micrometer.observation.ObservationRegistry;
35+
3436
/**
3537
* Base class for Message handling components that provides basic validation and error
3638
* handling capabilities. Asserts that the incoming Message is not null and that it does
@@ -61,6 +63,8 @@ public abstract class MessageHandlerSupport extends IntegrationObjectSupport
6163

6264
private MetricsCaptor metricsCaptor;
6365

66+
private ObservationRegistry observationRegistry;
67+
6468
private int order = Ordered.LOWEST_PRECEDENCE;
6569

6670
private String managedName;
@@ -89,6 +93,15 @@ protected MetricsCaptor getMetricsCaptor() {
8993
return this.metricsCaptor;
9094
}
9195

96+
@Override
97+
public void registerObservationRegistry(ObservationRegistry observationRegistry) {
98+
this.observationRegistry = observationRegistry;
99+
}
100+
101+
protected ObservationRegistry getObservationRegistry() {
102+
return this.observationRegistry;
103+
}
104+
92105
@Override
93106
public void setOrder(int order) {
94107
this.order = order;

spring-integration-core/src/main/java/org/springframework/integration/support/management/IntegrationManagement.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2021 the original author or authors.
2+
* Copyright 2015-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,10 +22,14 @@
2222
import org.springframework.jmx.export.annotation.ManagedAttribute;
2323
import org.springframework.lang.Nullable;
2424

25+
import io.micrometer.observation.ObservationRegistry;
26+
2527
/**
2628
* Base interface for Integration managed components.
2729
*
2830
* @author Gary Russell
31+
* @author Artem Bilan
32+
*
2933
* @since 4.2
3034
*
3135
*/
@@ -39,7 +43,7 @@ public interface IntegrationManagement extends NamedComponent, DisposableBean {
3943

4044
/**
4145
* Enable logging or not.
42-
* @param enabled dalse to disable.
46+
* @param enabled false to disable.
4347
*/
4448
@ManagedAttribute(description = "Use to disable debug logging during normal message flow")
4549
default void setLoggingEnabled(boolean enabled) {
@@ -80,13 +84,28 @@ default ManagementOverrides getOverrides() {
8084

8185
/**
8286
* Inject a {@link MetricsCaptor}.
87+
* Ignored if {@link ObservationRegistry} is provided.
8388
* @param captor the captor.
8489
* @since 5.0.4
90+
* @see #registerObservationRegistry(ObservationRegistry)
8591
*/
8692
default void registerMetricsCaptor(MetricsCaptor captor) {
8793
// no op
8894
}
8995

96+
/**
97+
* Inject an {@link ObservationRegistry}.
98+
* If provided, the {@link MetricsCaptor} is ignored.
99+
* The meters capturing has to be configured as an {@link io.micrometer.observation.ObservationHandler}
100+
* on the provided {@link ObservationRegistry}.
101+
* @param observationRegistry the {@link ObservationRegistry} to expose observations from the component.
102+
* @since 6.0
103+
* @see #registerMetricsCaptor(MetricsCaptor)
104+
*/
105+
default void registerObservationRegistry(ObservationRegistry observationRegistry) {
106+
// no op
107+
}
108+
90109
@Override
91110
default void destroy() {
92111
// no op
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.support.management.observation;
18+
19+
import io.micrometer.common.KeyValues;
20+
21+
/**
22+
* A default {@link MessageReceiverObservationConvention} implementation.
23+
* Provides low cardinalities as a {@link IntegrationObservation.HandlerTags} values.
24+
*
25+
* @author Artem Bilan
26+
*
27+
* @since 6.0
28+
*/
29+
public class DefaultMessageReceiverObservationConvention implements MessageReceiverObservationConvention {
30+
31+
/**
32+
* A shared singleton instance for {@link DefaultMessageReceiverObservationConvention}.
33+
*/
34+
public static final DefaultMessageReceiverObservationConvention INSTANCE =
35+
new DefaultMessageReceiverObservationConvention();
36+
37+
@Override
38+
public KeyValues getLowCardinalityKeyValues(MessageReceiverContext context) {
39+
return KeyValues.of(
40+
IntegrationObservation.HandlerTags.COMPONENT_NAME.withValue(context.getHandlerName()),
41+
IntegrationObservation.HandlerTags.COMPONENT_TYPE.withValue("handler"));
42+
}
43+
44+
}

0 commit comments

Comments
 (0)