Skip to content

Commit 2731e94

Browse files
authored
GH-7925: Make message history header as mutable
Fixes: #7925 The `MessageHistory.write()` creates not only a new instance of the `MessageHistory`, but also a new copy of the whole message. This significantly impacts the performance when we have too many components to track * Make `MessageHistory` as append-only container and create a new instance (plus message) only on the first track when no prior history is present * Change `WireTap`, `BroadcastingDispatcher`, `AbstractMessageRouter` and `AbstractMessageSplitter` to use a new `AbstractIntegrationMessageBuilder.cloneMessageHistoryIfAny()` API for every branch a message is produced. Essentially, create a new message with copy of the message history to let that downstream sub-flow have its own trace * Modify failed unit tests for a new logic where message history header is not immutable anymore * This also fixes an `AbstractMessageSplitter` for propagating its track into messages it emits * * Do not clone message history header if only one consume in multi-publish * Fix typos in docs
1 parent 3fbe917 commit 2731e94

File tree

12 files changed

+214
-134
lines changed

12 files changed

+214
-134
lines changed

spring-integration-core/src/main/java/org/springframework/integration/channel/interceptor/WireTap.java

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,8 +23,11 @@
2323
import org.springframework.beans.factory.BeanFactory;
2424
import org.springframework.beans.factory.BeanFactoryAware;
2525
import org.springframework.integration.core.MessageSelector;
26+
import org.springframework.integration.history.MessageHistory;
27+
import org.springframework.integration.support.MessageBuilderFactory;
2628
import org.springframework.integration.support.channel.ChannelResolverUtils;
2729
import org.springframework.integration.support.management.ManageableLifecycle;
30+
import org.springframework.integration.support.utils.IntegrationUtils;
2831
import org.springframework.jmx.export.annotation.ManagedAttribute;
2932
import org.springframework.jmx.export.annotation.ManagedOperation;
3033
import org.springframework.jmx.export.annotation.ManagedResource;
@@ -57,6 +60,8 @@ public class WireTap implements ChannelInterceptor, ManageableLifecycle, VetoCap
5760

5861
private BeanFactory beanFactory;
5962

63+
private MessageBuilderFactory messageBuilderFactory;
64+
6065
private volatile boolean running = true;
6166

6267

@@ -162,9 +167,19 @@ public Message<?> preSend(Message<?> message, MessageChannel channel) {
162167
return message;
163168
}
164169
if (this.running && (this.selector == null || this.selector.accept(message))) {
165-
boolean sent = (this.timeout >= 0)
166-
? wireTapChannel.send(message, this.timeout)
167-
: wireTapChannel.send(message);
170+
Message<?> messageToSend = message;
171+
if (message.getHeaders().containsKey(MessageHistory.HEADER_NAME)) {
172+
messageToSend =
173+
getMessageBuilderFactory()
174+
.fromMessage(message)
175+
.cloneMessageHistoryIfAny()
176+
.build();
177+
}
178+
boolean sent =
179+
(this.timeout >= 0)
180+
? wireTapChannel.send(messageToSend, this.timeout)
181+
: wireTapChannel.send(messageToSend);
182+
168183
if (!sent && LOGGER.isWarnEnabled()) {
169184
LOGGER.warn("failed to send message to WireTap channel '" + wireTapChannel + "'");
170185
}
@@ -174,7 +189,6 @@ public Message<?> preSend(Message<?> message, MessageChannel channel) {
174189

175190
@Override
176191
public boolean shouldIntercept(String beanName, InterceptableChannel channel) {
177-
178192
return !getChannel().equals(channel);
179193
}
180194

@@ -189,4 +203,11 @@ private MessageChannel getChannel() {
189203
return this.channel;
190204
}
191205

206+
private MessageBuilderFactory getMessageBuilderFactory() {
207+
if (this.messageBuilderFactory == null) {
208+
this.messageBuilderFactory = IntegrationUtils.getMessageBuilderFactory(this.beanFactory);
209+
}
210+
return this.messageBuilderFactory;
211+
}
212+
192213
}

spring-integration-core/src/main/java/org/springframework/integration/dispatcher/BroadcastingDispatcher.java

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2023 the original author or authors.
2+
* Copyright 2002-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,18 +17,23 @@
1717
package org.springframework.integration.dispatcher;
1818

1919
import java.util.Collection;
20+
import java.util.UUID;
2021
import java.util.concurrent.Executor;
2122

2223
import org.springframework.beans.BeansException;
2324
import org.springframework.beans.factory.BeanFactory;
2425
import org.springframework.beans.factory.BeanFactoryAware;
2526
import org.springframework.integration.MessageDispatchingException;
27+
import org.springframework.integration.context.IntegrationObjectSupport;
28+
import org.springframework.integration.history.MessageHistory;
29+
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
2630
import org.springframework.integration.support.DefaultMessageBuilderFactory;
2731
import org.springframework.integration.support.MessageBuilderFactory;
2832
import org.springframework.integration.support.MessageDecorator;
2933
import org.springframework.integration.support.utils.IntegrationUtils;
3034
import org.springframework.messaging.Message;
3135
import org.springframework.messaging.MessageHandler;
36+
import org.springframework.messaging.MessageHeaders;
3237
import org.springframework.messaging.MessagingException;
3338
import org.springframework.messaging.support.MessageHandlingRunnable;
3439
import org.springframework.util.Assert;
@@ -153,13 +158,28 @@ public boolean dispatch(Message<?> message) {
153158
}
154159
int sequenceSize = handlers.size();
155160
Message<?> messageToSend = message;
161+
MessageHeaders messageHeaders = message.getHeaders();
162+
UUID correlationKey = messageHeaders.getId();
163+
boolean hasMessageHistory = messageHeaders.containsKey(MessageHistory.HEADER_NAME) && sequenceSize > 1;
156164
for (MessageHandler handler : handlers) {
157-
if (this.applySequence) {
158-
messageToSend = getMessageBuilderFactory()
159-
.fromMessage(message)
160-
.pushSequenceDetails(message.getHeaders().getId(), sequenceNumber++, sequenceSize)
161-
.build();
165+
if (this.applySequence || hasMessageHistory) {
166+
AbstractIntegrationMessageBuilder<?> builder =
167+
getMessageBuilderFactory()
168+
.fromMessage(message);
169+
170+
if (this.applySequence) {
171+
builder.pushSequenceDetails(
172+
correlationKey == null ? IntegrationObjectSupport.generateId() : correlationKey,
173+
sequenceNumber++, sequenceSize);
174+
}
175+
176+
if (hasMessageHistory) {
177+
builder.cloneMessageHistoryIfAny();
178+
}
179+
180+
messageToSend = builder.build();
162181
}
182+
163183
if (message instanceof MessageDecorator messageDecorator) {
164184
messageToSend = messageDecorator.decorateMessage(messageToSend);
165185
}

spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.Collection;
2121
import java.util.Collections;
2222
import java.util.HashSet;
23+
import java.util.Iterator;
2324
import java.util.List;
2425
import java.util.Map;
2526
import java.util.Set;
@@ -465,6 +466,13 @@ else if (output instanceof AbstractIntegrationMessageBuilder) {
465466
}
466467
else {
467468
builder = getMessageBuilderFactory().withPayload(output);
469+
// Assuming that message in the payload collection is a copy of request message.
470+
if (output instanceof Iterable<?> iterable) {
471+
Iterator<?> iterator = iterable.iterator();
472+
if (iterator.hasNext() && iterator.next() instanceof Message<?>) {
473+
builder = builder.cloneMessageHistoryIfAny();
474+
}
475+
}
468476
}
469477
if (!this.noHeadersPropagation &&
470478
(shouldCopyRequestHeaders() ||

spring-integration-core/src/main/java/org/springframework/integration/history/MessageHistory.java

Lines changed: 53 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.integration.history;
1818

19+
import java.io.Serial;
1920
import java.io.Serializable;
2021
import java.util.ArrayList;
2122
import java.util.Collection;
@@ -52,8 +53,9 @@
5253
*
5354
* @since 2.0
5455
*/
55-
public final class MessageHistory implements List<Properties>, Serializable {
56+
public final class MessageHistory implements List<Properties>, Serializable, Cloneable {
5657

58+
@Serial
5759
private static final long serialVersionUID = -2340400235574314134L;
5860

5961
private static final Log LOGGER = LogFactory.getLog(MessageHistory.class);
@@ -94,51 +96,52 @@ public static <T> Message<T> write(Message<T> messageArg, NamedComponent compone
9496
Assert.notNull(component, "Component must not be null");
9597
Properties metadata = extractMetadata(component);
9698
if (!metadata.isEmpty()) {
97-
MessageHistory previousHistory = message.getHeaders().get(HEADER_NAME, MessageHistory.class);
98-
List<Properties> components =
99-
previousHistory != null
100-
? new ArrayList<>(previousHistory)
101-
: new ArrayList<>();
102-
components.add(metadata);
103-
MessageHistory history = new MessageHistory(components);
104-
105-
if (message instanceof MutableMessage) {
106-
message.getHeaders().put(HEADER_NAME, history);
99+
MessageHistory messageHistory = message.getHeaders().get(HEADER_NAME, MessageHistory.class);
100+
if (messageHistory != null) {
101+
messageHistory.components.add(metadata);
107102
}
108-
else if (message instanceof ErrorMessage) {
109-
ErrorMessage errorMessage = (ErrorMessage) message;
110-
IntegrationMessageHeaderAccessor headerAccessor = new IntegrationMessageHeaderAccessor(message);
111-
headerAccessor.setHeader(HEADER_NAME, history);
112-
Throwable payload = errorMessage.getPayload();
113-
Message<?> originalMessage = errorMessage.getOriginalMessage();
114-
if (originalMessage != null) {
115-
errorMessage = new ErrorMessage(payload, headerAccessor.toMessageHeaders(), originalMessage);
103+
else {
104+
List<Properties> components = new ArrayList<>();
105+
components.add(metadata);
106+
messageHistory = new MessageHistory(components);
107+
108+
if (message instanceof MutableMessage) {
109+
message.getHeaders().put(HEADER_NAME, messageHistory);
116110
}
117-
else {
118-
errorMessage = new ErrorMessage(payload, headerAccessor.toMessageHeaders());
111+
else if (message instanceof ErrorMessage errorMessage) {
112+
IntegrationMessageHeaderAccessor headerAccessor = new IntegrationMessageHeaderAccessor(message);
113+
headerAccessor.setHeader(HEADER_NAME, messageHistory);
114+
Throwable payload = errorMessage.getPayload();
115+
Message<?> originalMessage = errorMessage.getOriginalMessage();
116+
if (originalMessage != null) {
117+
errorMessage = new ErrorMessage(payload, headerAccessor.toMessageHeaders(), originalMessage);
118+
}
119+
else {
120+
errorMessage = new ErrorMessage(payload, headerAccessor.toMessageHeaders());
121+
}
122+
message = (Message<T>) errorMessage;
119123
}
120-
message = (Message<T>) errorMessage;
121-
}
122-
else if (message instanceof AdviceMessage) {
123-
IntegrationMessageHeaderAccessor headerAccessor = new IntegrationMessageHeaderAccessor(message);
124-
headerAccessor.setHeader(HEADER_NAME, history);
125-
message = new AdviceMessage<T>(message.getPayload(), headerAccessor.toMessageHeaders(),
126-
((AdviceMessage<?>) message).getInputMessage());
127-
}
128-
else {
129-
if (!(message instanceof GenericMessage) &&
130-
(messageBuilderFactory instanceof DefaultMessageBuilderFactory ||
131-
messageBuilderFactory instanceof MutableMessageBuilderFactory)
132-
&& LOGGER.isWarnEnabled()) {
133-
134-
LOGGER.warn("MessageHistory rebuilds the message and produces the result of the [" +
135-
messageBuilderFactory + "], not an instance of the provided type [" +
136-
message.getClass() + "]. Consider to supply a custom MessageBuilderFactory " +
137-
"to retain custom messages during MessageHistory tracking.");
124+
else if (message instanceof AdviceMessage<?> adviceMessage) {
125+
IntegrationMessageHeaderAccessor headerAccessor = new IntegrationMessageHeaderAccessor(message);
126+
headerAccessor.setHeader(HEADER_NAME, messageHistory);
127+
message = new AdviceMessage<>(message.getPayload(), headerAccessor.toMessageHeaders(),
128+
adviceMessage.getInputMessage());
129+
}
130+
else {
131+
if (!(message instanceof GenericMessage) &&
132+
(messageBuilderFactory instanceof DefaultMessageBuilderFactory ||
133+
messageBuilderFactory instanceof MutableMessageBuilderFactory)
134+
&& LOGGER.isWarnEnabled()) {
135+
136+
LOGGER.warn("MessageHistory rebuilds the message and produces the result of the [" +
137+
messageBuilderFactory + "], not an instance of the provided type [" +
138+
message.getClass() + "]. Consider to supply a custom MessageBuilderFactory " +
139+
"to retain custom messages during MessageHistory tracking.");
140+
}
141+
message = messageBuilderFactory.fromMessage(message)
142+
.setHeader(HEADER_NAME, messageHistory)
143+
.build();
138144
}
139-
message = messageBuilderFactory.fromMessage(message)
140-
.setHeader(HEADER_NAME, history)
141-
.build();
142145
}
143146
}
144147
return message;
@@ -216,15 +219,19 @@ public int lastIndexOf(Object o) {
216219
return this.components.lastIndexOf(o);
217220
}
218221

222+
@Override
223+
public Object clone() {
224+
return new MessageHistory(new ArrayList<>(this.components));
225+
}
226+
219227
@Override
220228
public boolean equals(Object o) {
221229
if (this == o) {
222230
return true;
223231
}
224-
if (!(o instanceof MessageHistory)) {
232+
if (!(o instanceof MessageHistory that)) {
225233
return false;
226234
}
227-
MessageHistory that = (MessageHistory) o;
228235
return this.components.equals(that.components);
229236
}
230237

@@ -318,6 +325,7 @@ private static Properties extractMetadata(NamedComponent component) {
318325
*/
319326
public static class Entry extends Properties {
320327

328+
@Serial
321329
private static final long serialVersionUID = -8225834391885601079L;
322330

323331
public String getName() {

spring-integration-core/src/main/java/org/springframework/integration/router/AbstractMessageRouter.java

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2023 the original author or authors.
2+
* Copyright 2002-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -27,6 +27,8 @@
2727
import org.springframework.integration.IntegrationPatternType;
2828
import org.springframework.integration.core.MessagingTemplate;
2929
import org.springframework.integration.handler.AbstractMessageHandler;
30+
import org.springframework.integration.history.MessageHistory;
31+
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
3032
import org.springframework.integration.support.management.IntegrationManagedResource;
3133
import org.springframework.jmx.export.annotation.ManagedResource;
3234
import org.springframework.messaging.Message;
@@ -193,19 +195,27 @@ protected void handleMessageInternal(Message<?> message) {
193195
if (results != null) {
194196
int sequenceSize = results.size();
195197
int sequenceNumber = 1;
198+
Message<?> messageToSend = message;
199+
UUID correlationKey = message.getHeaders().getId();
200+
boolean hasMessageHistory = message.getHeaders().containsKey(MessageHistory.HEADER_NAME) && sequenceSize > 1;
196201
for (MessageChannel channel : results) {
197-
final Message<?> messageToSend;
198-
if (!this.applySequence) {
199-
messageToSend = message;
200-
}
201-
else {
202-
UUID id = message.getHeaders().getId();
203-
messageToSend = getMessageBuilderFactory()
204-
.fromMessage(message)
205-
.pushSequenceDetails(id == null ? generateId() : id,
206-
sequenceNumber++, sequenceSize)
207-
.build();
202+
if (this.applySequence || hasMessageHistory) {
203+
AbstractIntegrationMessageBuilder<?> builder =
204+
getMessageBuilderFactory()
205+
.fromMessage(message);
206+
207+
if (this.applySequence) {
208+
builder.pushSequenceDetails(correlationKey == null ? generateId() : correlationKey,
209+
sequenceNumber++, sequenceSize);
210+
}
211+
212+
if (hasMessageHistory) {
213+
builder.cloneMessageHistoryIfAny();
214+
}
215+
216+
messageToSend = builder.build();
208217
}
218+
209219
if (channel != null) {
210220
sent |= doSend(channel, messageToSend);
211221
}

spring-integration-core/src/main/java/org/springframework/integration/splitter/AbstractMessageSplitter.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2023 the original author or authors.
2+
* Copyright 2002-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -35,6 +35,7 @@
3535
import org.springframework.integration.channel.ReactiveStreamsSubscribableChannel;
3636
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
3737
import org.springframework.integration.handler.DiscardingMessageHandler;
38+
import org.springframework.integration.history.MessageHistory;
3839
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
3940
import org.springframework.integration.support.json.JacksonPresent;
4041
import org.springframework.integration.util.FunctionIterator;
@@ -276,10 +277,14 @@ private AbstractIntegrationMessageBuilder<?> createBuilder(Object item, Map<Stri
276277
Object correlationId, int sequenceNumber, int sequenceSize) {
277278

278279
AbstractIntegrationMessageBuilder<?> builder = messageBuilderForReply(item);
279-
builder.copyHeadersIfAbsent(headers);
280+
builder.setHeader(MessageHistory.HEADER_NAME, headers.get(MessageHistory.HEADER_NAME))
281+
.copyHeadersIfAbsent(headers)
282+
.cloneMessageHistoryIfAny();
283+
280284
if (this.applySequence) {
281285
builder.pushSequenceDetails(correlationId, sequenceNumber, sequenceSize);
282286
}
287+
283288
return builder;
284289
}
285290

0 commit comments

Comments
 (0)