diff --git a/spring-integration-core/src/main/java/org/springframework/integration/channel/AbstractMessageChannel.java b/spring-integration-core/src/main/java/org/springframework/integration/channel/AbstractMessageChannel.java index 15be07aa639..2242e8b0606 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/channel/AbstractMessageChannel.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/channel/AbstractMessageChannel.java @@ -17,6 +17,7 @@ package org.springframework.integration.channel; import java.util.ArrayDeque; +import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.Deque; @@ -26,6 +27,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import io.micrometer.observation.ObservationRegistry; + import org.springframework.beans.factory.BeanFactory; import org.springframework.core.OrderComparator; import org.springframework.core.log.LogAccessor; @@ -34,6 +37,7 @@ import org.springframework.integration.context.IntegrationContextUtils; import org.springframework.integration.context.IntegrationObjectSupport; import org.springframework.integration.history.MessageHistory; +import org.springframework.integration.support.MutableMessage; import org.springframework.integration.support.management.IntegrationManagedResource; import org.springframework.integration.support.management.IntegrationManagement; import org.springframework.integration.support.management.TrackableComponent; @@ -41,6 +45,10 @@ import org.springframework.integration.support.management.metrics.MetricsCaptor; import org.springframework.integration.support.management.metrics.SampleFacade; import org.springframework.integration.support.management.metrics.TimerFacade; +import org.springframework.integration.support.management.observation.DefaultMessageSenderObservationConvention; +import org.springframework.integration.support.management.observation.IntegrationObservation; +import org.springframework.integration.support.management.observation.MessageSenderContext; +import org.springframework.integration.support.management.observation.MessageSenderObservationConvention; import org.springframework.integration.support.utils.IntegrationUtils; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; @@ -75,15 +83,18 @@ public abstract class AbstractMessageChannel extends IntegrationObjectSupport protected final Set meters = ConcurrentHashMap.newKeySet(); // NOSONAR - private volatile boolean shouldTrack = false; + private ObservationRegistry observationRegistry = ObservationRegistry.NOOP; + + @Nullable + private MessageSenderObservationConvention observationConvention; - private volatile Class[] datatypes = new Class[0]; + private boolean shouldTrack = false; - private volatile String fullChannelName; + private Class[] datatypes = new Class[0]; - private volatile MessageConverter messageConverter; + private MessageConverter messageConverter; - private volatile boolean loggingEnabled = true; + private boolean loggingEnabled = true; private MetricsCaptor metricsCaptor; @@ -91,6 +102,8 @@ public abstract class AbstractMessageChannel extends IntegrationObjectSupport private TimerFacade failureTimer; + private volatile String fullChannelName; + @Override public String getComponentType() { return "channel"; @@ -138,10 +151,7 @@ public void setLoggingEnabled(boolean loggingEnabled) { * @see #setMessageConverter(MessageConverter) */ public void setDatatypes(Class... datatypes) { - this.datatypes = - (datatypes != null && datatypes.length > 0) - ? datatypes - : new Class[0]; + this.datatypes = Arrays.copyOf(datatypes, datatypes.length); } /** @@ -192,6 +202,10 @@ public void setMessageConverter(MessageConverter messageConverter) { this.messageConverter = messageConverter; } + public void setObservationConvention(@Nullable MessageSenderObservationConvention observationConvention) { + this.observationConvention = observationConvention; + } + /** * Return a read-only list of the configured interceptors. */ @@ -224,6 +238,12 @@ public ManagementOverrides getOverrides() { return this.managementOverrides; } + @Override + public void registerObservationRegistry(ObservationRegistry observationRegistry) { + Assert.notNull(observationRegistry, "'observationRegistry' must not be null"); + this.observationRegistry = observationRegistry; + } + @Override protected void onInit() { super.onInit(); @@ -276,15 +296,14 @@ public boolean send(Message message) { * Send a message on this channel. If the channel is at capacity, this * method will block until either the timeout occurs or the sending thread * is interrupted. If the specified timeout is 0, the method will return - * immediately. If less than zero, it will block indefinitely (see - * {@link #send(Message)}). + * immediately. If less than zero, it will block indefinitely (see {@link #send(Message)}). * @param messageArg the Message to send * @param timeout the timeout in milliseconds * @return true if the message is sent successfully, * false if the message cannot be sent within the allotted * time or the sending thread is interrupted. */ - @Override // NOSONAR complexity + @Override public boolean send(Message messageArg, long timeout) { Assert.notNull(messageArg, "message must not be null"); Assert.notNull(messageArg.getPayload(), "message payload must not be null"); @@ -293,11 +312,44 @@ public boolean send(Message messageArg, long timeout) { message = MessageHistory.write(message, this, getMessageBuilderFactory()); } + if (!ObservationRegistry.NOOP.equals(this.observationRegistry)) { + return sendWithObservation(message, timeout); + } + else if (this.metricsCaptor != null) { + return sendWithMetrics(message, timeout); + } + else { + return sendInternal(message, timeout); + } + } + + private boolean sendWithObservation(Message message, long timeout) { + MutableMessage messageToSend = MutableMessage.of(message); + return IntegrationObservation.PRODUCER.observation( + this.observationConvention, + DefaultMessageSenderObservationConvention.INSTANCE, + () -> new MessageSenderContext(messageToSend, getComponentName()), + this.observationRegistry) + .observe(() -> sendInternal(messageToSend, timeout)); + } + + private boolean sendWithMetrics(Message message, long timeout) { + SampleFacade sample = this.metricsCaptor.start(); + try { + boolean sent = sendInternal(message, timeout); + sample.stop(sendTimer(sent)); + return sent; + } + catch (RuntimeException ex) { + sample.stop(buildSendTimer(false, ex.getClass().getSimpleName())); + throw ex; + } + } + + private boolean sendInternal(Message message, long timeout) { Deque interceptorStack = null; boolean sent = false; - boolean metricsProcessed = false; ChannelInterceptorList interceptorList = this.interceptors; - SampleFacade sample = null; try { message = convertPayloadIfNecessary(message); boolean debugEnabled = this.loggingEnabled && this.logger.isDebugEnabled(); @@ -311,14 +363,8 @@ public boolean send(Message messageArg, long timeout) { return false; } } - if (this.metricsCaptor != null) { - sample = this.metricsCaptor.start(); - } + sent = doSend(message, timeout); - if (sample != null) { - sample.stop(sendTimer(sent)); - } - metricsProcessed = true; if (debugEnabled) { logger.debug("postSend (sent=" + sent + ") on channel '" + this + "', message: " + message); @@ -330,9 +376,6 @@ public boolean send(Message messageArg, long timeout) { return sent; } catch (Exception ex) { - if (!metricsProcessed && sample != null) { - sample.stop(buildSendTimer(false, ex.getClass().getSimpleName())); - } if (interceptorStack != null) { interceptorList.afterSendCompletion(message, this, sent, ex, interceptorStack); } @@ -411,7 +454,7 @@ private Message convertPayloadIfNecessary(Message message) { * accepted or the blocking thread is interrupted. * @param message The message. * @param timeout The timeout. - * @return true if the send was successful. + * @return true if the {@code send} was successful. */ protected abstract boolean doSend(Message message, long timeout); diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/MutableMessage.java b/spring-integration-core/src/main/java/org/springframework/integration/support/MutableMessage.java index 1695096ae9e..c54ad22a97b 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/support/MutableMessage.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/MutableMessage.java @@ -113,8 +113,7 @@ public boolean equals(Object obj) { if (this == obj) { return true; } - if (obj instanceof MutableMessage) { - MutableMessage other = (MutableMessage) obj; + if (obj instanceof MutableMessage other) { UUID thisId = this.headers.getId(); UUID otherId = other.headers.getId(); return (ObjectUtils.nullSafeEquals(thisId, otherId) && @@ -123,4 +122,18 @@ public boolean equals(Object obj) { return false; } + /** + * Build a new {@link MutableMessage} based on the provided message + * if that one is not already a {@link MutableMessage}. + * @param message the message to build from. + * @return new {@link MutableMessage}. + * @since 6.0 + */ + public static MutableMessage of(Message message) { + if (message instanceof MutableMessage) { + return (MutableMessage) message; + } + return new MutableMessage<>(message.getPayload(), message.getHeaders()); + } + } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/management/observation/DefaultMessageSenderObservationConvention.java b/spring-integration-core/src/main/java/org/springframework/integration/support/management/observation/DefaultMessageSenderObservationConvention.java new file mode 100644 index 00000000000..8aca73980e4 --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/management/observation/DefaultMessageSenderObservationConvention.java @@ -0,0 +1,46 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.support.management.observation; + +import io.micrometer.common.KeyValues; + +/** + * A default {@link MessageSenderObservationConvention} implementation. + * Provides low cardinalities as a {@link IntegrationObservation.ProducerTags} values. + * + * @author Artem Bilan + * + * @since 6.0 + */ +public class DefaultMessageSenderObservationConvention implements MessageSenderObservationConvention { + + /** + * A shared singleton instance for {@link DefaultMessageSenderObservationConvention}. + */ + public static final DefaultMessageSenderObservationConvention INSTANCE = + new DefaultMessageSenderObservationConvention(); + + + @Override + public KeyValues getLowCardinalityKeyValues(MessageSenderContext context) { + return KeyValues + // See IntegrationObservation.ProducerTags.COMPONENT_NAME - to avoid class tangle + .of("spring.integration.name", context.getProducerName()) + // See IntegrationObservation.ProducerTags.COMPONENT_TYPE - to avoid class tangle + .and("spring.integration.type", "producer"); + } +} diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/management/observation/IntegrationObservation.java b/spring-integration-core/src/main/java/org/springframework/integration/support/management/observation/IntegrationObservation.java index 7219fb41f1e..8510269eb7f 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/support/management/observation/IntegrationObservation.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/management/observation/IntegrationObservation.java @@ -71,6 +71,27 @@ public KeyName[] getLowCardinalityKeyNames() { return GatewayTags.values(); } + }, + + /** + * Observation for message producers, e.g. channels. + */ + PRODUCER { + @Override + public String getPrefix() { + return "spring.integration."; + } + + @Override + public Class getDefaultConvention() { + return DefaultMessageSenderObservationConvention.class; + } + + @Override + public KeyName[] getLowCardinalityKeyNames() { + return ProducerTags.values(); + } + }; /** @@ -141,4 +162,33 @@ public String asString() { } + /** + * Key names for message producer observations. + */ + public enum ProducerTags implements KeyName { + + /** + * Name of the message handler component. + */ + COMPONENT_NAME { + @Override + public String asString() { + return "spring.integration.name"; + } + + }, + + /** + * Type of the component - 'producer'. + */ + COMPONENT_TYPE { + @Override + public String asString() { + return "spring.integration.type"; + } + + } + + } + } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/management/observation/MessageSenderContext.java b/spring-integration-core/src/main/java/org/springframework/integration/support/management/observation/MessageSenderContext.java index aab70cc4882..284742fcdcd 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/support/management/observation/MessageSenderContext.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/management/observation/MessageSenderContext.java @@ -30,9 +30,23 @@ */ public class MessageSenderContext extends SenderContext> { - public MessageSenderContext(MutableMessage message) { + private final MutableMessage message; + + private final String producerName; + + public MessageSenderContext(MutableMessage message, String producerName) { super((carrier, key, value) -> carrier.getHeaders().put(key, value)); - setCarrier(message); + this.message = message; + this.producerName = producerName; + } + + @Override + public MutableMessage getCarrier() { + return this.message; + } + + public String getProducerName() { + return this.producerName; } } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/management/observation/MessageSenderObservationConvention.java b/spring-integration-core/src/main/java/org/springframework/integration/support/management/observation/MessageSenderObservationConvention.java new file mode 100644 index 00000000000..1eec7a06aab --- /dev/null +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/management/observation/MessageSenderObservationConvention.java @@ -0,0 +1,46 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.support.management.observation; + +import io.micrometer.observation.Observation; +import io.micrometer.observation.ObservationConvention; + +/** + * A {@link MessageSenderContext}-based {@link ObservationConvention} contract. + * + * @author Artem Bilan + * + * @since 6.0 + */ +public interface MessageSenderObservationConvention extends ObservationConvention { + + @Override + default String getName() { + return "spring.integration.producer"; + } + + @Override + default boolean supportsContext(Observation.Context context) { + return context instanceof MessageSenderContext; + } + + @Override + default String getContextualName(MessageSenderContext context) { + return context.getProducerName() + " send"; + } + +} diff --git a/spring-integration-core/src/test/java/org/springframework/integration/channel/interceptor/ObservationPropagationChannelInterceptorTests.java b/spring-integration-core/src/test/java/org/springframework/integration/channel/interceptor/ObservationPropagationChannelInterceptorTests.java index 82f7091bafd..7295d8a8ab8 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/channel/interceptor/ObservationPropagationChannelInterceptorTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/channel/interceptor/ObservationPropagationChannelInterceptorTests.java @@ -58,10 +58,8 @@ import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.config.GlobalChannelInterceptor; import org.springframework.integration.handler.BridgeHandler; -import org.springframework.integration.support.MutableMessage; -import org.springframework.integration.support.MutableMessageBuilder; +import org.springframework.integration.support.MessageBuilder; import org.springframework.integration.support.management.observation.IntegrationObservation; -import org.springframework.integration.support.management.observation.MessageSenderContext; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; @@ -215,13 +213,12 @@ void observationContextPropagatedOverExecutorChannel() { QueueChannel replyChannel = new QueueChannel(); - MutableMessage message = - (MutableMessage) MutableMessageBuilder.withPayload("test") + Message message = + MessageBuilder.withPayload("test") .setHeader(MessageHeaders.REPLY_CHANNEL, replyChannel) .build(); - Observation.createNotStarted("sending", () -> new MessageSenderContext(message), this.observationRegistry) - .observe(() -> this.testTracingChannel.send(message)); + this.testTracingChannel.send(message); Message receive = replyChannel.receive(); @@ -238,7 +235,11 @@ void observationContextPropagatedOverExecutorChannel() { .reportedSpans() .hasSize(2) .satisfies(simpleSpans -> SpansAssert.assertThat(simpleSpans) - .hasASpanWithName("sending") + .assertThatASpanWithNameEqualTo("testTracingChannel send") + .hasTag("spring.integration.type", "producer") + .hasTag("spring.integration.name", "testTracingChannel") + .hasKindEqualTo(Span.Kind.PRODUCER) + .backToSpans() .assertThatASpanWithNameEqualTo("testBridge receive") .hasTag("foo", "some foo value") .hasTag("bar", "some bar value") @@ -315,8 +316,10 @@ public DirectChannel testConsumer() { } @Bean - public ExecutorChannel testTracingChannel() { - return new ExecutorChannel(Executors.newSingleThreadExecutor()); + public ExecutorChannel testTracingChannel(ObservationRegistry observationRegistry) { + ExecutorChannel channel = new ExecutorChannel(Executors.newSingleThreadExecutor()); + channel.registerObservationRegistry(observationRegistry); + return channel; } @Bean diff --git a/spring-integration-core/src/test/java/org/springframework/integration/support/management/observation/IntegrationObservabilityZipkinTests.java b/spring-integration-core/src/test/java/org/springframework/integration/support/management/observation/IntegrationObservabilityZipkinTests.java index d0368a3abc8..9f464ce2d91 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/support/management/observation/IntegrationObservabilityZipkinTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/support/management/observation/IntegrationObservabilityZipkinTests.java @@ -92,7 +92,11 @@ public SampleTestRunnerConsumer yourCode() { .hasTag(IntegrationObservation.HandlerTags.COMPONENT_NAME.asString(), "observedEndpoint") .hasTag(IntegrationObservation.HandlerTags.COMPONENT_TYPE.asString(), "handler") .hasKindEqualTo(Span.Kind.CONSUMER)) - .hasSize(2); + .hasASpanWithName("queueChannel send", spanAssert -> spanAssert + .hasTag(IntegrationObservation.ProducerTags.COMPONENT_NAME.asString(), "queueChannel") + .hasTag(IntegrationObservation.ProducerTags.COMPONENT_TYPE.asString(), "producer") + .hasKindEqualTo(Span.Kind.PRODUCER)) + .hasSize(3); MeterRegistryAssert.assertThat(getMeterRegistry()) .hasTimerWithNameAndTags("spring.integration.handler", @@ -108,7 +112,7 @@ public SampleTestRunnerConsumer yourCode() { @EnableIntegration @EnableIntegrationManagement( observationPatterns = { - "${spring.integration.management.observation-patterns:observedEndpoint,testInboundGateway}", + "${spring.integration.management.observation-patterns:testInboundGateway,queueChannel,observedEndpoint}", "${spring.integration.management.observation-patterns:}" }) public static class ObservationIntegrationTestConfiguration { diff --git a/src/reference/asciidoc/metrics.adoc b/src/reference/asciidoc/metrics.adoc index 3b1be9b9800..455584a2699 100644 --- a/src/reference/asciidoc/metrics.adoc +++ b/src/reference/asciidoc/metrics.adoc @@ -159,6 +159,17 @@ Can be configured as `*` to match all components. The meters are not gathered in this case independently, but delegated to an appropriate `ObservationHandler` configured on the provided `ObservationRegistry`. +The following Spring Integration components are instrumented with observation logic each with a respective convention: + +* `MessageProducerSupport`, being the inbound endpoint of the flow, is considered as a `CONSUMER` span type and uses the `IntegrationObservation.HANDLER` API; +* MessagingGatewaySupport` is an inbound request-reply endpoint, and is considered as a `SERVER` span type. +It uses the `IntegrationObservation.GATEWAY` API; +* An `AbstractMessageChannel.send()` operation is the only Spring Integration API where it produces messages. +So, it is treated as a `PRODUCER` span type and uses the `IntegrationObservation.PRODCUER` API. +This makes more sense when a channel is a distributed implementation (e.g. `PublishSubscribeKafkaChannel` or `ZeroMqChannel`) and trace information has to be added to the message. +So, the `IntegrationObservation.PRODUCER` observation is based on a `MessageSenderContext` where Spring Integration supplies a `MutableMessage` to allow a subsequent tracing `Propagator` to add headers so they are available to the consumer; +* An `AbstractMessageHandler` is a `CONSUMER` span type and uses the `IntegrationObservation.HANDLER` API. + An observation production on the `IntegrationManagement` components can be customized via `ObservationConvention` configuration. For example an `AbstractMessageHandler` expects a `MessageReceiverObservationConvention` via its `setObservationConvention()` API.