Skip to content

Commit 88e259c

Browse files
Add observation for message channels (#3944)
* Add observation for message channels * Add observation for message channels The `MessageChannel.send()` is, essentially, only the point in Spring Integration where we produce a message and can emit a `PRODUCER` kind span. * Implement `IntegrationObservation.PRODUCER` infrastructure based on the `MessageSenderContext` * Implement an observation emission in the `AbstractMessageChannel` based on the mentioned `IntegrationObservation.PRODUCER` * Build a `MutableMessage.of(message)` to be able to modify message header in the `MessageSenderContext` via tracer `Propagator` or other tracing injection instrument * Document which components are instrumented with an `ObservationRegistry` * Fix language in docs Co-authored-by: Gary Russell <[email protected]> Co-authored-by: Gary Russell <[email protected]>
1 parent 4d2a4ca commit 88e259c

File tree

9 files changed

+271
-41
lines changed

9 files changed

+271
-41
lines changed

spring-integration-core/src/main/java/org/springframework/integration/channel/AbstractMessageChannel.java

+68-25
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.integration.channel;
1818

1919
import java.util.ArrayDeque;
20+
import java.util.Arrays;
2021
import java.util.Collections;
2122
import java.util.Comparator;
2223
import java.util.Deque;
@@ -26,6 +27,8 @@
2627
import java.util.concurrent.ConcurrentHashMap;
2728
import java.util.concurrent.CopyOnWriteArrayList;
2829

30+
import io.micrometer.observation.ObservationRegistry;
31+
2932
import org.springframework.beans.factory.BeanFactory;
3033
import org.springframework.core.OrderComparator;
3134
import org.springframework.core.log.LogAccessor;
@@ -34,13 +37,18 @@
3437
import org.springframework.integration.context.IntegrationContextUtils;
3538
import org.springframework.integration.context.IntegrationObjectSupport;
3639
import org.springframework.integration.history.MessageHistory;
40+
import org.springframework.integration.support.MutableMessage;
3741
import org.springframework.integration.support.management.IntegrationManagedResource;
3842
import org.springframework.integration.support.management.IntegrationManagement;
3943
import org.springframework.integration.support.management.TrackableComponent;
4044
import org.springframework.integration.support.management.metrics.MeterFacade;
4145
import org.springframework.integration.support.management.metrics.MetricsCaptor;
4246
import org.springframework.integration.support.management.metrics.SampleFacade;
4347
import org.springframework.integration.support.management.metrics.TimerFacade;
48+
import org.springframework.integration.support.management.observation.DefaultMessageSenderObservationConvention;
49+
import org.springframework.integration.support.management.observation.IntegrationObservation;
50+
import org.springframework.integration.support.management.observation.MessageSenderContext;
51+
import org.springframework.integration.support.management.observation.MessageSenderObservationConvention;
4452
import org.springframework.integration.support.utils.IntegrationUtils;
4553
import org.springframework.lang.Nullable;
4654
import org.springframework.messaging.Message;
@@ -75,22 +83,27 @@ public abstract class AbstractMessageChannel extends IntegrationObjectSupport
7583

7684
protected final Set<MeterFacade> meters = ConcurrentHashMap.newKeySet(); // NOSONAR
7785

78-
private volatile boolean shouldTrack = false;
86+
private ObservationRegistry observationRegistry = ObservationRegistry.NOOP;
87+
88+
@Nullable
89+
private MessageSenderObservationConvention observationConvention;
7990

80-
private volatile Class<?>[] datatypes = new Class<?>[0];
91+
private boolean shouldTrack = false;
8192

82-
private volatile String fullChannelName;
93+
private Class<?>[] datatypes = new Class<?>[0];
8394

84-
private volatile MessageConverter messageConverter;
95+
private MessageConverter messageConverter;
8596

86-
private volatile boolean loggingEnabled = true;
97+
private boolean loggingEnabled = true;
8798

8899
private MetricsCaptor metricsCaptor;
89100

90101
private TimerFacade successTimer;
91102

92103
private TimerFacade failureTimer;
93104

105+
private volatile String fullChannelName;
106+
94107
@Override
95108
public String getComponentType() {
96109
return "channel";
@@ -138,10 +151,7 @@ public void setLoggingEnabled(boolean loggingEnabled) {
138151
* @see #setMessageConverter(MessageConverter)
139152
*/
140153
public void setDatatypes(Class<?>... datatypes) {
141-
this.datatypes =
142-
(datatypes != null && datatypes.length > 0)
143-
? datatypes
144-
: new Class<?>[0];
154+
this.datatypes = Arrays.copyOf(datatypes, datatypes.length);
145155
}
146156

147157
/**
@@ -192,6 +202,10 @@ public void setMessageConverter(MessageConverter messageConverter) {
192202
this.messageConverter = messageConverter;
193203
}
194204

205+
public void setObservationConvention(@Nullable MessageSenderObservationConvention observationConvention) {
206+
this.observationConvention = observationConvention;
207+
}
208+
195209
/**
196210
* Return a read-only list of the configured interceptors.
197211
*/
@@ -224,6 +238,12 @@ public ManagementOverrides getOverrides() {
224238
return this.managementOverrides;
225239
}
226240

241+
@Override
242+
public void registerObservationRegistry(ObservationRegistry observationRegistry) {
243+
Assert.notNull(observationRegistry, "'observationRegistry' must not be null");
244+
this.observationRegistry = observationRegistry;
245+
}
246+
227247
@Override
228248
protected void onInit() {
229249
super.onInit();
@@ -276,15 +296,14 @@ public boolean send(Message<?> message) {
276296
* Send a message on this channel. If the channel is at capacity, this
277297
* method will block until either the timeout occurs or the sending thread
278298
* is interrupted. If the specified timeout is 0, the method will return
279-
* immediately. If less than zero, it will block indefinitely (see
280-
* {@link #send(Message)}).
299+
* immediately. If less than zero, it will block indefinitely (see {@link #send(Message)}).
281300
* @param messageArg the Message to send
282301
* @param timeout the timeout in milliseconds
283302
* @return <code>true</code> if the message is sent successfully,
284303
* <code>false</code> if the message cannot be sent within the allotted
285304
* time or the sending thread is interrupted.
286305
*/
287-
@Override // NOSONAR complexity
306+
@Override
288307
public boolean send(Message<?> messageArg, long timeout) {
289308
Assert.notNull(messageArg, "message must not be null");
290309
Assert.notNull(messageArg.getPayload(), "message payload must not be null");
@@ -293,11 +312,44 @@ public boolean send(Message<?> messageArg, long timeout) {
293312
message = MessageHistory.write(message, this, getMessageBuilderFactory());
294313
}
295314

315+
if (!ObservationRegistry.NOOP.equals(this.observationRegistry)) {
316+
return sendWithObservation(message, timeout);
317+
}
318+
else if (this.metricsCaptor != null) {
319+
return sendWithMetrics(message, timeout);
320+
}
321+
else {
322+
return sendInternal(message, timeout);
323+
}
324+
}
325+
326+
private boolean sendWithObservation(Message<?> message, long timeout) {
327+
MutableMessage<?> messageToSend = MutableMessage.of(message);
328+
return IntegrationObservation.PRODUCER.observation(
329+
this.observationConvention,
330+
DefaultMessageSenderObservationConvention.INSTANCE,
331+
() -> new MessageSenderContext(messageToSend, getComponentName()),
332+
this.observationRegistry)
333+
.observe(() -> sendInternal(messageToSend, timeout));
334+
}
335+
336+
private boolean sendWithMetrics(Message<?> message, long timeout) {
337+
SampleFacade sample = this.metricsCaptor.start();
338+
try {
339+
boolean sent = sendInternal(message, timeout);
340+
sample.stop(sendTimer(sent));
341+
return sent;
342+
}
343+
catch (RuntimeException ex) {
344+
sample.stop(buildSendTimer(false, ex.getClass().getSimpleName()));
345+
throw ex;
346+
}
347+
}
348+
349+
private boolean sendInternal(Message<?> message, long timeout) {
296350
Deque<ChannelInterceptor> interceptorStack = null;
297351
boolean sent = false;
298-
boolean metricsProcessed = false;
299352
ChannelInterceptorList interceptorList = this.interceptors;
300-
SampleFacade sample = null;
301353
try {
302354
message = convertPayloadIfNecessary(message);
303355
boolean debugEnabled = this.loggingEnabled && this.logger.isDebugEnabled();
@@ -311,14 +363,8 @@ public boolean send(Message<?> messageArg, long timeout) {
311363
return false;
312364
}
313365
}
314-
if (this.metricsCaptor != null) {
315-
sample = this.metricsCaptor.start();
316-
}
366+
317367
sent = doSend(message, timeout);
318-
if (sample != null) {
319-
sample.stop(sendTimer(sent));
320-
}
321-
metricsProcessed = true;
322368

323369
if (debugEnabled) {
324370
logger.debug("postSend (sent=" + sent + ") on channel '" + this + "', message: " + message);
@@ -330,9 +376,6 @@ public boolean send(Message<?> messageArg, long timeout) {
330376
return sent;
331377
}
332378
catch (Exception ex) {
333-
if (!metricsProcessed && sample != null) {
334-
sample.stop(buildSendTimer(false, ex.getClass().getSimpleName()));
335-
}
336379
if (interceptorStack != null) {
337380
interceptorList.afterSendCompletion(message, this, sent, ex, interceptorStack);
338381
}
@@ -411,7 +454,7 @@ private Message<?> convertPayloadIfNecessary(Message<?> message) {
411454
* accepted or the blocking thread is interrupted.
412455
* @param message The message.
413456
* @param timeout The timeout.
414-
* @return true if the send was successful.
457+
* @return true if the {@code send} was successful.
415458
*/
416459
protected abstract boolean doSend(Message<?> message, long timeout);
417460

spring-integration-core/src/main/java/org/springframework/integration/support/MutableMessage.java

+15-2
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,7 @@ public boolean equals(Object obj) {
113113
if (this == obj) {
114114
return true;
115115
}
116-
if (obj instanceof MutableMessage<?>) {
117-
MutableMessage<?> other = (MutableMessage<?>) obj;
116+
if (obj instanceof MutableMessage<?> other) {
118117
UUID thisId = this.headers.getId();
119118
UUID otherId = other.headers.getId();
120119
return (ObjectUtils.nullSafeEquals(thisId, otherId) &&
@@ -123,4 +122,18 @@ public boolean equals(Object obj) {
123122
return false;
124123
}
125124

125+
/**
126+
* Build a new {@link MutableMessage} based on the provided message
127+
* if that one is not already a {@link MutableMessage}.
128+
* @param message the message to build from.
129+
* @return new {@link MutableMessage}.
130+
* @since 6.0
131+
*/
132+
public static MutableMessage<?> of(Message<?> message) {
133+
if (message instanceof MutableMessage) {
134+
return (MutableMessage<?>) message;
135+
}
136+
return new MutableMessage<>(message.getPayload(), message.getHeaders());
137+
}
138+
126139
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.support.management.observation;
18+
19+
import io.micrometer.common.KeyValues;
20+
21+
/**
22+
* A default {@link MessageSenderObservationConvention} implementation.
23+
* Provides low cardinalities as a {@link IntegrationObservation.ProducerTags} values.
24+
*
25+
* @author Artem Bilan
26+
*
27+
* @since 6.0
28+
*/
29+
public class DefaultMessageSenderObservationConvention implements MessageSenderObservationConvention {
30+
31+
/**
32+
* A shared singleton instance for {@link DefaultMessageSenderObservationConvention}.
33+
*/
34+
public static final DefaultMessageSenderObservationConvention INSTANCE =
35+
new DefaultMessageSenderObservationConvention();
36+
37+
38+
@Override
39+
public KeyValues getLowCardinalityKeyValues(MessageSenderContext context) {
40+
return KeyValues
41+
// See IntegrationObservation.ProducerTags.COMPONENT_NAME - to avoid class tangle
42+
.of("spring.integration.name", context.getProducerName())
43+
// See IntegrationObservation.ProducerTags.COMPONENT_TYPE - to avoid class tangle
44+
.and("spring.integration.type", "producer");
45+
}
46+
}

spring-integration-core/src/main/java/org/springframework/integration/support/management/observation/IntegrationObservation.java

+50
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,27 @@ public KeyName[] getLowCardinalityKeyNames() {
7171
return GatewayTags.values();
7272
}
7373

74+
},
75+
76+
/**
77+
* Observation for message producers, e.g. channels.
78+
*/
79+
PRODUCER {
80+
@Override
81+
public String getPrefix() {
82+
return "spring.integration.";
83+
}
84+
85+
@Override
86+
public Class<DefaultMessageSenderObservationConvention> getDefaultConvention() {
87+
return DefaultMessageSenderObservationConvention.class;
88+
}
89+
90+
@Override
91+
public KeyName[] getLowCardinalityKeyNames() {
92+
return ProducerTags.values();
93+
}
94+
7495
};
7596

7697
/**
@@ -141,4 +162,33 @@ public String asString() {
141162

142163
}
143164

165+
/**
166+
* Key names for message producer observations.
167+
*/
168+
public enum ProducerTags implements KeyName {
169+
170+
/**
171+
* Name of the message handler component.
172+
*/
173+
COMPONENT_NAME {
174+
@Override
175+
public String asString() {
176+
return "spring.integration.name";
177+
}
178+
179+
},
180+
181+
/**
182+
* Type of the component - 'producer'.
183+
*/
184+
COMPONENT_TYPE {
185+
@Override
186+
public String asString() {
187+
return "spring.integration.type";
188+
}
189+
190+
}
191+
192+
}
193+
144194
}

spring-integration-core/src/main/java/org/springframework/integration/support/management/observation/MessageSenderContext.java

+16-2
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,23 @@
3030
*/
3131
public class MessageSenderContext extends SenderContext<MutableMessage<?>> {
3232

33-
public MessageSenderContext(MutableMessage<?> message) {
33+
private final MutableMessage<?> message;
34+
35+
private final String producerName;
36+
37+
public MessageSenderContext(MutableMessage<?> message, String producerName) {
3438
super((carrier, key, value) -> carrier.getHeaders().put(key, value));
35-
setCarrier(message);
39+
this.message = message;
40+
this.producerName = producerName;
41+
}
42+
43+
@Override
44+
public MutableMessage<?> getCarrier() {
45+
return this.message;
46+
}
47+
48+
public String getProducerName() {
49+
return this.producerName;
3650
}
3751

3852
}

0 commit comments

Comments
 (0)