Skip to content

Commit 89c5dfa

Browse files
committed
GH-9743: Add observation to the SourcePollingChannelAdapter
Fixes: #9743 Spring Integration provides observation for the `MessageChannel`, `MessageHandler` and `MessageProducerSupport`. The `SourcePollingChannelAdapter` is missing, and it is that only special endpoint which deals with `MessageSource` implementations via scheduled tasks in the poller. Essentially, this endpoint is a start of the flow, but it still is a consumer of data from the source system. * Add an `Observation` logic to the `SourcePollingChannelAdapter`. * Divide it into two phases: start (and open scope) when message is received; stop (and close scope) when the whole polling task for a message is done. We need this separation because of transaction scope for the polling task. At the same time we don't want to emit an observation for a void polling task. * Change `MessageReceiverContext` to accept a `handlerType`. The `MessageHandler` contributes a `handler`. The new support in the `SourcePollingChannelAdapter` - `message-source`. And change `MessageProducerSupport` to contribute a `message-producer` * Verify the single trace is supported for the whole flow (including transaction synchronization) starting from a `SourcePollingChannelAdapter` in a new `SourcePollingChannelAdapterObservationTests` * Document this new feature
1 parent 177bda5 commit 89c5dfa

File tree

8 files changed

+295
-39
lines changed

8 files changed

+295
-39
lines changed

spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2024 the original author or authors.
2+
* Copyright 2002-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -51,6 +51,7 @@
5151
import org.springframework.integration.transaction.TransactionSynchronizationFactory;
5252
import org.springframework.integration.util.ErrorHandlingTaskExecutor;
5353
import org.springframework.jmx.export.annotation.ManagedAttribute;
54+
import org.springframework.lang.Nullable;
5455
import org.springframework.messaging.Message;
5556
import org.springframework.messaging.MessageChannel;
5657
import org.springframework.messaging.MessagingException;
@@ -416,10 +417,12 @@ private Flux<Message<?>> createFluxGenerator() {
416417
}
417418

418419
private Message<?> pollForMessage() {
420+
Exception pollingTaskError = null;
419421
try {
420422
return this.pollingTask.call();
421423
}
422424
catch (Exception ex) {
425+
pollingTaskError = ex;
423426
if (ex instanceof MessagingException) { // NOSONAR
424427
throw (MessagingException) ex;
425428
}
@@ -441,6 +444,7 @@ private Message<?> pollForMessage() {
441444
TransactionSynchronizationManager.unbindResource(resource);
442445
}
443446
}
447+
donePollingTask(pollingTaskError);
444448
}
445449
}
446450

@@ -471,7 +475,7 @@ private Message<?> doPoll() {
471475
return message;
472476
}
473477

474-
private void messageReceived(IntegrationResourceHolder holder, Message<?> message) {
478+
protected void messageReceived(@Nullable IntegrationResourceHolder holder, Message<?> message) {
475479
this.logger.debug(() -> "Poll resulted in Message: " + message);
476480
if (holder != null) {
477481
holder.setMessage(message);
@@ -490,6 +494,10 @@ private void messageReceived(IntegrationResourceHolder holder, Message<?> messag
490494
}
491495
}
492496

497+
protected void donePollingTask(@Nullable Exception pollingTaskError) {
498+
499+
}
500+
493501
@Override // guarded by super#lifecycleLock
494502
protected void doStop() {
495503
if (this.runningTask != null) {
@@ -536,6 +544,7 @@ protected String getResourceKey() {
536544
return null;
537545
}
538546

547+
@Nullable
539548
private IntegrationResourceHolder bindResourceHolderIfNecessary(String key, Object resource) {
540549
if (this.transactionSynchronizationFactory != null && resource != null &&
541550
TransactionSynchronizationManager.isActualTransactionActive()) {

spring-integration-core/src/main/java/org/springframework/integration/endpoint/MessageProducerSupport.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -256,7 +256,7 @@ protected void sendMessage(Message<?> message) {
256256
IntegrationObservation.HANDLER.observation(
257257
this.observationConvention,
258258
DefaultMessageReceiverObservationConvention.INSTANCE,
259-
() -> new MessageReceiverContext(message, getComponentName()),
259+
() -> new MessageReceiverContext(message, getComponentName(), "message-producer"),
260260
this.observationRegistry)
261261
.observe(() -> this.messagingTemplate.send(getRequiredOutputChannel(), trackMessageIfAny(message)));
262262
}

spring-integration-core/src/main/java/org/springframework/integration/endpoint/SourcePollingChannelAdapter.java

Lines changed: 93 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2024 the original author or authors.
2+
* Copyright 2002-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,8 +19,12 @@
1919
import java.util.concurrent.locks.Lock;
2020
import java.util.concurrent.locks.ReentrantLock;
2121

22+
import io.micrometer.observation.Observation;
23+
import io.micrometer.observation.ObservationRegistry;
24+
2225
import org.springframework.aop.framework.Advised;
2326
import org.springframework.beans.factory.BeanCreationException;
27+
import org.springframework.beans.factory.BeanFactory;
2428
import org.springframework.context.Lifecycle;
2529
import org.springframework.integration.StaticMessageHeaderAccessor;
2630
import org.springframework.integration.acks.AckUtils;
@@ -31,16 +35,21 @@
3135
import org.springframework.integration.core.MessagingTemplate;
3236
import org.springframework.integration.history.MessageHistory;
3337
import org.springframework.integration.support.context.NamedComponent;
38+
import org.springframework.integration.support.management.IntegrationManagement;
3439
import org.springframework.integration.support.management.TrackableComponent;
40+
import org.springframework.integration.support.management.observation.DefaultMessageReceiverObservationConvention;
41+
import org.springframework.integration.support.management.observation.IntegrationObservation;
42+
import org.springframework.integration.support.management.observation.MessageReceiverContext;
43+
import org.springframework.integration.support.management.observation.MessageReceiverObservationConvention;
3544
import org.springframework.integration.transaction.IntegrationResourceHolder;
45+
import org.springframework.lang.Nullable;
3646
import org.springframework.messaging.Message;
3747
import org.springframework.messaging.MessageChannel;
3848
import org.springframework.messaging.MessagingException;
3949
import org.springframework.util.Assert;
4050

4151
/**
42-
* A Channel Adapter implementation for connecting a
43-
* {@link MessageSource} to a {@link MessageChannel}.
52+
* A Channel Adapter implementation for connecting a {@link MessageSource} to a {@link MessageChannel}.
4453
*
4554
* @author Mark Fisher
4655
* @author Oleg Zhurakousky
@@ -49,12 +58,16 @@
4958
* @author Christian Tzolov
5059
*/
5160
public class SourcePollingChannelAdapter extends AbstractPollingEndpoint
52-
implements TrackableComponent {
61+
implements TrackableComponent, IntegrationManagement {
5362

5463
private final MessagingTemplate messagingTemplate = new MessagingTemplate();
5564

5665
private MessageSource<?> originalSource;
5766

67+
private ObservationRegistry observationRegistry = ObservationRegistry.NOOP;
68+
69+
private MessageReceiverObservationConvention observationConvention;
70+
5871
private volatile MessageSource<?> source;
5972

6073
private volatile MessageChannel outputChannel;
@@ -67,7 +80,6 @@ public class SourcePollingChannelAdapter extends AbstractPollingEndpoint
6780

6881
/**
6982
* Specify the source to be polled for Messages.
70-
*
7183
* @param source The message source.
7284
*/
7385
public void setSource(MessageSource<?> source) {
@@ -76,14 +88,13 @@ public void setSource(MessageSource<?> source) {
7688
Object target = extractProxyTarget(source);
7789
this.originalSource = target != null ? (MessageSource<?>) target : source;
7890

79-
if (source instanceof ExpressionCapable) {
80-
setPrimaryExpression(((ExpressionCapable) source).getExpression());
91+
if (source instanceof ExpressionCapable expressionCapable) {
92+
setPrimaryExpression(expressionCapable.getExpression());
8193
}
8294
}
8395

8496
/**
8597
* Specify the {@link MessageChannel} where Messages should be sent.
86-
*
8798
* @param outputChannel The output channel.
8899
*/
89100
public void setOutputChannel(MessageChannel outputChannel) {
@@ -105,9 +116,7 @@ public void setOutputChannelName(String outputChannelName) {
105116
}
106117

107118
/**
108-
* Specify the maximum time to wait for a Message to be sent to the
109-
* output channel.
110-
*
119+
* Specify the maximum time to wait for a Message to be sent to the output channel.
111120
* @param sendTimeout The send timeout.
112121
*/
113122
public void setSendTimeout(long sendTimeout) {
@@ -116,18 +125,38 @@ public void setSendTimeout(long sendTimeout) {
116125

117126
/**
118127
* Specify whether this component should be tracked in the Message History.
119-
*
120128
* @param shouldTrack true if the component should be tracked.
121129
*/
122130
@Override
123131
public void setShouldTrack(boolean shouldTrack) {
124132
this.shouldTrack = shouldTrack;
125133
}
126134

135+
@Override
136+
public void registerObservationRegistry(ObservationRegistry observationRegistry) {
137+
this.observationRegistry = observationRegistry;
138+
}
139+
140+
/**
141+
* Set a custom {@link MessageReceiverObservationConvention} for {@link IntegrationObservation#HANDLER}.
142+
* Ignored if an {@link ObservationRegistry} is not configured for this component.
143+
* @param observationConvention the {@link MessageReceiverObservationConvention} to use.
144+
* @since 6.5
145+
*/
146+
public void setObservationConvention(@Nullable MessageReceiverObservationConvention observationConvention) {
147+
this.observationConvention = observationConvention;
148+
}
149+
150+
@Override
151+
public boolean isObserved() {
152+
return !ObservationRegistry.NOOP.equals(this.observationRegistry);
153+
}
154+
127155
@Override
128156
public String getComponentType() {
129-
return (this.source instanceof NamedComponent) ?
130-
((NamedComponent) this.source).getComponentType() : "inbound-channel-adapter";
157+
return (this.source instanceof NamedComponent namedComponent)
158+
? namedComponent.getComponentType()
159+
: "inbound-channel-adapter";
131160
}
132161

133162
@Override
@@ -147,8 +176,8 @@ protected final void setReceiveMessageSource(Object source) {
147176

148177
@Override
149178
protected void doStart() {
150-
if (this.source instanceof Lifecycle) {
151-
((Lifecycle) this.source).start();
179+
if (this.source instanceof Lifecycle lifecycle) {
180+
lifecycle.start();
152181
}
153182
super.doStart();
154183

@@ -160,8 +189,8 @@ protected void doStart() {
160189
@Override
161190
protected void doStop() {
162191
super.doStop();
163-
if (this.source instanceof Lifecycle) {
164-
((Lifecycle) this.source).stop();
192+
if (this.source instanceof Lifecycle lifecycle) {
193+
lifecycle.stop();
165194
}
166195
}
167196

@@ -172,8 +201,9 @@ protected void onInit() {
172201
|| (this.outputChannelName != null && this.outputChannel == null),
173202
"One and only one of 'outputChannelName' or 'outputChannel' is required.");
174203
super.onInit();
175-
if (this.getBeanFactory() != null) {
176-
this.messagingTemplate.setBeanFactory(this.getBeanFactory());
204+
BeanFactory beanFactory = getBeanFactory();
205+
if (beanFactory != null) {
206+
this.messagingTemplate.setBeanFactory(beanFactory);
177207
}
178208
}
179209

@@ -204,13 +234,13 @@ protected void handleMessage(Message<?> messageArg) {
204234
this.messagingTemplate.send(getOutputChannel(), message);
205235
AckUtils.autoAck(ackCallback);
206236
}
207-
catch (Exception e) {
237+
catch (Exception ex) {
208238
AckUtils.autoNack(ackCallback);
209-
if (e instanceof MessagingException) { // NOSONAR
210-
throw (MessagingException) e;
239+
if (ex instanceof MessagingException messagingException) { // NOSONAR
240+
throw messagingException;
211241
}
212242
else {
213-
throw new MessagingException(message, "Failed to send Message", e);
243+
throw new MessagingException(message, "Failed to send Message", ex);
214244
}
215245
}
216246
}
@@ -220,6 +250,41 @@ protected Message<?> receiveMessage() {
220250
return this.source.receive();
221251
}
222252

253+
/**
254+
* Start an observation (and open scope) for the received message.
255+
* @param holder the resource holder for this component.
256+
* @param message the received message.
257+
*/
258+
@Override
259+
protected void messageReceived(@Nullable IntegrationResourceHolder holder, Message<?> message) {
260+
Observation observation =
261+
IntegrationObservation.HANDLER.observation(this.observationConvention,
262+
DefaultMessageReceiverObservationConvention.INSTANCE,
263+
() -> new MessageReceiverContext(message, getComponentName(), "message-source"),
264+
this.observationRegistry);
265+
266+
observation.start().openScope();
267+
super.messageReceived(holder, message);
268+
}
269+
270+
/**
271+
* Stop an observation (and close its scope) previously started
272+
* from the {@link #messageReceived(IntegrationResourceHolder, Message)}.
273+
* @param pollingTaskError an optional error as a result of the polling task.
274+
*/
275+
@Override
276+
protected void donePollingTask(@Nullable Exception pollingTaskError) {
277+
Observation.Scope currentObservationScope = this.observationRegistry.getCurrentObservationScope();
278+
if (currentObservationScope != null) {
279+
currentObservationScope.close();
280+
Observation currentObservation = currentObservationScope.getCurrentObservation();
281+
if (pollingTaskError != null) {
282+
currentObservation.error(pollingTaskError);
283+
}
284+
currentObservation.stop();
285+
}
286+
}
287+
223288
@Override
224289
protected Object getResourceToBind() {
225290
return this.originalSource;
@@ -230,16 +295,16 @@ protected String getResourceKey() {
230295
return IntegrationResourceHolder.MESSAGE_SOURCE;
231296
}
232297

298+
@Nullable
233299
private static Object extractProxyTarget(Object target) {
234-
if (!(target instanceof Advised)) {
300+
if (!(target instanceof Advised advised)) {
235301
return target;
236302
}
237-
Advised advised = (Advised) target;
238303
try {
239304
return extractProxyTarget(advised.getTargetSource().getTarget());
240305
}
241-
catch (Exception e) {
242-
throw new BeanCreationException("Could not extract target", e);
306+
catch (Exception ex) {
307+
throw new BeanCreationException("Could not extract target", ex);
243308
}
244309
}
245310

spring-integration-core/src/main/java/org/springframework/integration/support/management/observation/DefaultMessageReceiverObservationConvention.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022 the original author or authors.
2+
* Copyright 2022-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -40,7 +40,7 @@ public KeyValues getLowCardinalityKeyValues(MessageReceiverContext context) {
4040
// See IntegrationObservation.HandlerTags.COMPONENT_NAME - to avoid class tangle
4141
.of("spring.integration.name", context.getHandlerName())
4242
// See IntegrationObservation.HandlerTags.COMPONENT_TYPE - to avoid class tangle
43-
.and("spring.integration.type", "handler");
43+
.and("spring.integration.type", context.getHandlerType());
4444
}
4545

4646
}

spring-integration-core/src/main/java/org/springframework/integration/support/management/observation/MessageReceiverContext.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022-2024 the original author or authors.
2+
* Copyright 2022-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -36,10 +36,24 @@ public class MessageReceiverContext extends ReceiverContext<Message<?>> {
3636

3737
private final String handlerName;
3838

39+
private final String handlerType;
40+
3941
public MessageReceiverContext(Message<?> message, @Nullable String handlerName) {
42+
this(message, handlerName, "handler");
43+
}
44+
45+
/**
46+
* Construct an instance based on the message, the handler (or source, producer) bean name and handler type.
47+
* @param message the received message for this context.
48+
* @param handlerName the handler (or source, producer) bean name processing the message.
49+
* @param handlerType the handler type: {@code handler}, or {@code message-source}, or {@code message-producer}.
50+
* @since 6.5
51+
*/
52+
public MessageReceiverContext(Message<?> message, @Nullable String handlerName, String handlerType) {
4053
super(MessageReceiverContext::getHeader);
4154
this.message = message;
4255
this.handlerName = handlerName != null ? handlerName : "unknown";
56+
this.handlerType = handlerType;
4357
}
4458

4559
@Override
@@ -51,6 +65,10 @@ public String getHandlerName() {
5165
return this.handlerName;
5266
}
5367

68+
public String getHandlerType() {
69+
return this.handlerType;
70+
}
71+
5472
@Nullable
5573
private static String getHeader(Message<?> message, String key) {
5674
Object value = message.getHeaders().get(key);

0 commit comments

Comments
 (0)