From 1dd0d17e5fae9fec7f4094c01f2a90f1c1347278 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Mon, 4 Mar 2024 17:31:37 -0500 Subject: [PATCH 1/2] GH-7925: Make message history header as mutable Fixes: #7925 The `MessageHistory.write()` creates not only a new instance of the `MessageHistory`, but also a new copy of the whole message. This significantly impacts the performance when we have too many components to track * Make `MessageHistory` as append-only container and create a new instance (plus message) only on the first track when no prior history is present * Change `WireTap`, `BroadcastingDispatcher`, `AbstractMessageRouter` and `AbstractMessageSplitter` to use a new `AbstractIntegrationMessageBuilder.cloneMessageHistoryIfAny()` API for every branch a message is produced. Essentially, create a new message with copy of the message history to let that downstream sub-flow have its own trace * Modify failed unit tests for a new logic where message history header is not immutable anymore * This also fixes an `AbstractMessageSplitter` for propagating its track into messages it emits --- .../channel/interceptor/WireTap.java | 31 +++++- .../dispatcher/BroadcastingDispatcher.java | 28 ++++-- .../AbstractMessageProducingHandler.java | 8 ++ .../integration/history/MessageHistory.java | 98 ++++++++++--------- .../router/AbstractMessageRouter.java | 30 +++--- .../splitter/AbstractMessageSplitter.java | 9 +- .../AbstractIntegrationMessageBuilder.java | 19 +++- ...ingAnnotationsWithBeanAnnotationTests.java | 8 +- .../integration/core/MessageHistoryTests.java | 14 +-- .../BroadcastingDispatcherTests.java | 81 +++++++-------- .../modules/ROOT/pages/message-history.adoc | 10 +- .../antora/modules/ROOT/pages/whats-new.adoc | 4 + 12 files changed, 206 insertions(+), 134 deletions(-) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/channel/interceptor/WireTap.java b/spring-integration-core/src/main/java/org/springframework/integration/channel/interceptor/WireTap.java index 038b4b6d559..4b6005a9130 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/channel/interceptor/WireTap.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/channel/interceptor/WireTap.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,8 +23,11 @@ import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.integration.core.MessageSelector; +import org.springframework.integration.history.MessageHistory; +import org.springframework.integration.support.MessageBuilderFactory; import org.springframework.integration.support.channel.ChannelResolverUtils; import org.springframework.integration.support.management.ManageableLifecycle; +import org.springframework.integration.support.utils.IntegrationUtils; import org.springframework.jmx.export.annotation.ManagedAttribute; import org.springframework.jmx.export.annotation.ManagedOperation; import org.springframework.jmx.export.annotation.ManagedResource; @@ -57,6 +60,8 @@ public class WireTap implements ChannelInterceptor, ManageableLifecycle, VetoCap private BeanFactory beanFactory; + private MessageBuilderFactory messageBuilderFactory; + private volatile boolean running = true; @@ -162,9 +167,19 @@ public Message preSend(Message message, MessageChannel channel) { return message; } if (this.running && (this.selector == null || this.selector.accept(message))) { - boolean sent = (this.timeout >= 0) - ? wireTapChannel.send(message, this.timeout) - : wireTapChannel.send(message); + Message messageToSend = message; + if (message.getHeaders().containsKey(MessageHistory.HEADER_NAME)) { + messageToSend = + getMessageBuilderFactory() + .fromMessage(message) + .cloneMessageHistoryIfAny() + .build(); + } + boolean sent = + (this.timeout >= 0) + ? wireTapChannel.send(messageToSend, this.timeout) + : wireTapChannel.send(messageToSend); + if (!sent && LOGGER.isWarnEnabled()) { LOGGER.warn("failed to send message to WireTap channel '" + wireTapChannel + "'"); } @@ -174,7 +189,6 @@ public Message preSend(Message message, MessageChannel channel) { @Override public boolean shouldIntercept(String beanName, InterceptableChannel channel) { - return !getChannel().equals(channel); } @@ -189,4 +203,11 @@ private MessageChannel getChannel() { return this.channel; } + private MessageBuilderFactory getMessageBuilderFactory() { + if (this.messageBuilderFactory == null) { + this.messageBuilderFactory = IntegrationUtils.getMessageBuilderFactory(this.beanFactory); + } + return this.messageBuilderFactory; + } + } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/BroadcastingDispatcher.java b/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/BroadcastingDispatcher.java index 43dec106abb..4e39e2617bd 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/BroadcastingDispatcher.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/BroadcastingDispatcher.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2023 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,18 +17,23 @@ package org.springframework.integration.dispatcher; import java.util.Collection; +import java.util.UUID; import java.util.concurrent.Executor; import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactoryAware; import org.springframework.integration.MessageDispatchingException; +import org.springframework.integration.context.IntegrationObjectSupport; +import org.springframework.integration.history.MessageHistory; +import org.springframework.integration.support.AbstractIntegrationMessageBuilder; import org.springframework.integration.support.DefaultMessageBuilderFactory; import org.springframework.integration.support.MessageBuilderFactory; import org.springframework.integration.support.MessageDecorator; import org.springframework.integration.support.utils.IntegrationUtils; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.MessagingException; import org.springframework.messaging.support.MessageHandlingRunnable; import org.springframework.util.Assert; @@ -153,13 +158,24 @@ public boolean dispatch(Message message) { } int sequenceSize = handlers.size(); Message messageToSend = message; + MessageHeaders messageHeaders = message.getHeaders(); + UUID correlationKey = messageHeaders.getId(); + boolean hasMessageHistory = messageHeaders.containsKey(MessageHistory.HEADER_NAME); for (MessageHandler handler : handlers) { - if (this.applySequence) { - messageToSend = getMessageBuilderFactory() - .fromMessage(message) - .pushSequenceDetails(message.getHeaders().getId(), sequenceNumber++, sequenceSize) - .build(); + if (this.applySequence || hasMessageHistory) { + AbstractIntegrationMessageBuilder builder = + getMessageBuilderFactory() + .fromMessage(message); + + if (this.applySequence) { + builder.pushSequenceDetails( + correlationKey == null ? IntegrationObjectSupport.generateId() : correlationKey, + sequenceNumber++, sequenceSize); + } + + messageToSend = builder.cloneMessageHistoryIfAny().build(); } + if (message instanceof MessageDecorator messageDecorator) { messageToSend = messageDecorator.decorateMessage(messageToSend); } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java b/spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java index 6bf0d3432ed..021b81e714c 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java @@ -20,6 +20,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -465,6 +466,13 @@ else if (output instanceof AbstractIntegrationMessageBuilder) { } else { builder = getMessageBuilderFactory().withPayload(output); + // Assuming that message in the payload collection is a copy of request message. + if (output instanceof Iterable iterable) { + Iterator iterator = iterable.iterator(); + if (iterator.hasNext() && iterator.next() instanceof Message) { + builder = builder.cloneMessageHistoryIfAny(); + } + } } if (!this.noHeadersPropagation && (shouldCopyRequestHeaders() || diff --git a/spring-integration-core/src/main/java/org/springframework/integration/history/MessageHistory.java b/spring-integration-core/src/main/java/org/springframework/integration/history/MessageHistory.java index 63ce9737c75..2dfb4bba942 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/history/MessageHistory.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/history/MessageHistory.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ package org.springframework.integration.history; +import java.io.Serial; import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; @@ -52,8 +53,9 @@ * * @since 2.0 */ -public final class MessageHistory implements List, Serializable { +public final class MessageHistory implements List, Serializable, Cloneable { + @Serial private static final long serialVersionUID = -2340400235574314134L; private static final Log LOGGER = LogFactory.getLog(MessageHistory.class); @@ -94,51 +96,52 @@ public static Message write(Message messageArg, NamedComponent compone Assert.notNull(component, "Component must not be null"); Properties metadata = extractMetadata(component); if (!metadata.isEmpty()) { - MessageHistory previousHistory = message.getHeaders().get(HEADER_NAME, MessageHistory.class); - List components = - previousHistory != null - ? new ArrayList<>(previousHistory) - : new ArrayList<>(); - components.add(metadata); - MessageHistory history = new MessageHistory(components); - - if (message instanceof MutableMessage) { - message.getHeaders().put(HEADER_NAME, history); + MessageHistory messageHistory = message.getHeaders().get(HEADER_NAME, MessageHistory.class); + if (messageHistory != null) { + messageHistory.components.add(metadata); } - else if (message instanceof ErrorMessage) { - ErrorMessage errorMessage = (ErrorMessage) message; - IntegrationMessageHeaderAccessor headerAccessor = new IntegrationMessageHeaderAccessor(message); - headerAccessor.setHeader(HEADER_NAME, history); - Throwable payload = errorMessage.getPayload(); - Message originalMessage = errorMessage.getOriginalMessage(); - if (originalMessage != null) { - errorMessage = new ErrorMessage(payload, headerAccessor.toMessageHeaders(), originalMessage); + else { + List components = new ArrayList<>(); + components.add(metadata); + messageHistory = new MessageHistory(components); + + if (message instanceof MutableMessage) { + message.getHeaders().put(HEADER_NAME, messageHistory); } - else { - errorMessage = new ErrorMessage(payload, headerAccessor.toMessageHeaders()); + else if (message instanceof ErrorMessage errorMessage) { + IntegrationMessageHeaderAccessor headerAccessor = new IntegrationMessageHeaderAccessor(message); + headerAccessor.setHeader(HEADER_NAME, messageHistory); + Throwable payload = errorMessage.getPayload(); + Message originalMessage = errorMessage.getOriginalMessage(); + if (originalMessage != null) { + errorMessage = new ErrorMessage(payload, headerAccessor.toMessageHeaders(), originalMessage); + } + else { + errorMessage = new ErrorMessage(payload, headerAccessor.toMessageHeaders()); + } + message = (Message) errorMessage; } - message = (Message) errorMessage; - } - else if (message instanceof AdviceMessage) { - IntegrationMessageHeaderAccessor headerAccessor = new IntegrationMessageHeaderAccessor(message); - headerAccessor.setHeader(HEADER_NAME, history); - message = new AdviceMessage(message.getPayload(), headerAccessor.toMessageHeaders(), - ((AdviceMessage) message).getInputMessage()); - } - else { - if (!(message instanceof GenericMessage) && - (messageBuilderFactory instanceof DefaultMessageBuilderFactory || - messageBuilderFactory instanceof MutableMessageBuilderFactory) - && LOGGER.isWarnEnabled()) { - - LOGGER.warn("MessageHistory rebuilds the message and produces the result of the [" + - messageBuilderFactory + "], not an instance of the provided type [" + - message.getClass() + "]. Consider to supply a custom MessageBuilderFactory " + - "to retain custom messages during MessageHistory tracking."); + else if (message instanceof AdviceMessage adviceMessage) { + IntegrationMessageHeaderAccessor headerAccessor = new IntegrationMessageHeaderAccessor(message); + headerAccessor.setHeader(HEADER_NAME, messageHistory); + message = new AdviceMessage<>(message.getPayload(), headerAccessor.toMessageHeaders(), + adviceMessage.getInputMessage()); + } + else { + if (!(message instanceof GenericMessage) && + (messageBuilderFactory instanceof DefaultMessageBuilderFactory || + messageBuilderFactory instanceof MutableMessageBuilderFactory) + && LOGGER.isWarnEnabled()) { + + LOGGER.warn("MessageHistory rebuilds the message and produces the result of the [" + + messageBuilderFactory + "], not an instance of the provided type [" + + message.getClass() + "]. Consider to supply a custom MessageBuilderFactory " + + "to retain custom messages during MessageHistory tracking."); + } + message = messageBuilderFactory.fromMessage(message) + .setHeader(HEADER_NAME, messageHistory) + .build(); } - message = messageBuilderFactory.fromMessage(message) - .setHeader(HEADER_NAME, history) - .build(); } } return message; @@ -216,15 +219,19 @@ public int lastIndexOf(Object o) { return this.components.lastIndexOf(o); } + @Override + public Object clone() { + return new MessageHistory(new ArrayList<>(this.components)); + } + @Override public boolean equals(Object o) { if (this == o) { return true; } - if (!(o instanceof MessageHistory)) { + if (!(o instanceof MessageHistory that)) { return false; } - MessageHistory that = (MessageHistory) o; return this.components.equals(that.components); } @@ -318,6 +325,7 @@ private static Properties extractMetadata(NamedComponent component) { */ public static class Entry extends Properties { + @Serial private static final long serialVersionUID = -8225834391885601079L; public String getName() { diff --git a/spring-integration-core/src/main/java/org/springframework/integration/router/AbstractMessageRouter.java b/spring-integration-core/src/main/java/org/springframework/integration/router/AbstractMessageRouter.java index a174708c957..55116316b4f 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/router/AbstractMessageRouter.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/router/AbstractMessageRouter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2023 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,6 +27,8 @@ import org.springframework.integration.IntegrationPatternType; import org.springframework.integration.core.MessagingTemplate; import org.springframework.integration.handler.AbstractMessageHandler; +import org.springframework.integration.history.MessageHistory; +import org.springframework.integration.support.AbstractIntegrationMessageBuilder; import org.springframework.integration.support.management.IntegrationManagedResource; import org.springframework.jmx.export.annotation.ManagedResource; import org.springframework.messaging.Message; @@ -193,19 +195,23 @@ protected void handleMessageInternal(Message message) { if (results != null) { int sequenceSize = results.size(); int sequenceNumber = 1; + Message messageToSend = message; + UUID correlationKey = message.getHeaders().getId(); + boolean hasMessageHistory = message.getHeaders().containsKey(MessageHistory.HEADER_NAME); for (MessageChannel channel : results) { - final Message messageToSend; - if (!this.applySequence) { - messageToSend = message; - } - else { - UUID id = message.getHeaders().getId(); - messageToSend = getMessageBuilderFactory() - .fromMessage(message) - .pushSequenceDetails(id == null ? generateId() : id, - sequenceNumber++, sequenceSize) - .build(); + if (this.applySequence || hasMessageHistory) { + AbstractIntegrationMessageBuilder builder = + getMessageBuilderFactory() + .fromMessage(message); + + if (this.applySequence) { + builder.pushSequenceDetails(correlationKey == null ? generateId() : correlationKey, + sequenceNumber++, sequenceSize); + } + + messageToSend = builder.cloneMessageHistoryIfAny().build(); } + if (channel != null) { sent |= doSend(channel, messageToSend); } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/splitter/AbstractMessageSplitter.java b/spring-integration-core/src/main/java/org/springframework/integration/splitter/AbstractMessageSplitter.java index 92f1a616206..b6717c293bd 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/splitter/AbstractMessageSplitter.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/splitter/AbstractMessageSplitter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2023 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,6 +35,7 @@ import org.springframework.integration.channel.ReactiveStreamsSubscribableChannel; import org.springframework.integration.handler.AbstractReplyProducingMessageHandler; import org.springframework.integration.handler.DiscardingMessageHandler; +import org.springframework.integration.history.MessageHistory; import org.springframework.integration.support.AbstractIntegrationMessageBuilder; import org.springframework.integration.support.json.JacksonPresent; import org.springframework.integration.util.FunctionIterator; @@ -276,10 +277,14 @@ private AbstractIntegrationMessageBuilder createBuilder(Object item, Map builder = messageBuilderForReply(item); - builder.copyHeadersIfAbsent(headers); + builder.setHeader(MessageHistory.HEADER_NAME, headers.get(MessageHistory.HEADER_NAME)) + .copyHeadersIfAbsent(headers) + .cloneMessageHistoryIfAny(); + if (this.applySequence) { builder.pushSequenceDetails(correlationId, sequenceNumber, sequenceSize); } + return builder; } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/support/AbstractIntegrationMessageBuilder.java b/spring-integration-core/src/main/java/org/springframework/integration/support/AbstractIntegrationMessageBuilder.java index d16f7dc8d28..393cfb1822c 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/support/AbstractIntegrationMessageBuilder.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/support/AbstractIntegrationMessageBuilder.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2021 the original author or authors. + * Copyright 2014-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,6 +25,7 @@ import java.util.Map; import org.springframework.integration.IntegrationMessageHeaderAccessor; +import org.springframework.integration.history.MessageHistory; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; @@ -165,6 +166,22 @@ public AbstractIntegrationMessageBuilder filterAndCopyHeadersIfAbsent(Map cloneMessageHistoryIfAny() { + MessageHistory messageHistory = getHeader(MessageHistory.HEADER_NAME, MessageHistory.class); + + if (messageHistory != null) { + return removeHeader(MessageHistory.HEADER_NAME) + .setHeader(MessageHistory.HEADER_NAME, messageHistory.clone()); + } + + return this; + } + @Nullable protected abstract List> getSequenceDetails(); diff --git a/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationsWithBeanAnnotationTests.java b/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationsWithBeanAnnotationTests.java index 290d1f96bbc..71ad8049ad4 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationsWithBeanAnnotationTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationsWithBeanAnnotationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2022 the original author or authors. + * Copyright 2014-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -186,10 +186,8 @@ public void testMessagingAnnotationsFlow() throws InterruptedException { assertThat(messageHistory).isNotNull(); String messageHistoryString = messageHistory.toString(); assertThat(messageHistoryString) - .contains("routerChannel", "filterChannel", "aggregatorChannel", "serviceChannel") - .doesNotContain("discardChannel") - // history header is not overridden in splitter for individual message from message group emitted before - .doesNotContain("splitterChannel"); + .contains("routerChannel", "filterChannel", "aggregatorChannel", "splitterChannel", "serviceChannel") + .doesNotContain("discardChannel"); } assertThat(this.skippedServiceActivator).isNull(); diff --git a/spring-integration-core/src/test/java/org/springframework/integration/core/MessageHistoryTests.java b/spring-integration-core/src/test/java/org/springframework/integration/core/MessageHistoryTests.java index 392029e7830..f5de05e838d 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/core/MessageHistoryTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/core/MessageHistoryTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -96,7 +96,7 @@ public void testCorrectErrorMessageAfterWrite() { Message result2 = MessageHistory.write(result1, new TestComponent(2)); assertThat(result2).isInstanceOf(ErrorMessage.class); assertThat(result2).isNotSameAs(original); - assertThat(result2).isNotSameAs(result1); + assertThat(result2).isSameAs(result1); assertThat(result2.getPayload()).isSameAs(original.getPayload()); assertThat(result1).extracting("originalMessage").isSameAs(originalMessage); MessageHistory history2 = MessageHistory.read(result2); @@ -122,20 +122,14 @@ public void testCorrectAdviceMessageAfterWrite() { assertThat(result2).isNotSameAs(original); assertThat(result2.getPayload()).isSameAs(original.getPayload()); assertThat(((AdviceMessage) result2).getInputMessage()).isSameAs(original.getInputMessage()); - assertThat(result2).isNotSameAs(result1); + assertThat(result2).isSameAs(result1); MessageHistory history2 = MessageHistory.read(result2); assertThat(history2).isNotNull(); assertThat(history2.toString()).isEqualTo("testComponent-1,testComponent-2"); } - private static class TestComponent implements NamedComponent { - - private final int id; - - TestComponent(int id) { - this.id = id; - } + private record TestComponent(int id) implements NamedComponent { @Override public String getComponentName() { diff --git a/spring-integration-core/src/test/java/org/springframework/integration/dispatcher/BroadcastingDispatcherTests.java b/spring-integration-core/src/test/java/org/springframework/integration/dispatcher/BroadcastingDispatcherTests.java index 38fd8bec788..26406ed4c5b 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/dispatcher/BroadcastingDispatcherTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/dispatcher/BroadcastingDispatcherTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,8 +21,8 @@ import java.util.List; import java.util.concurrent.atomic.AtomicInteger; -import org.junit.Before; -import org.junit.Test; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.mockito.Mockito; import org.springframework.core.task.TaskExecutor; @@ -35,10 +35,12 @@ import org.springframework.messaging.support.GenericMessage; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.fail; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.BDDMockito.given; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; /** @@ -52,26 +54,27 @@ public class BroadcastingDispatcherTests { private BroadcastingDispatcher dispatcher; - private final TaskExecutor taskExecutorMock = Mockito.mock(TaskExecutor.class); + private final TaskExecutor taskExecutorMock = Mockito.mock(); - private final Message messageMock = Mockito.mock(Message.class); + private final Message messageMock = Mockito.mock(); - private final MessageHandler targetMock1 = Mockito.mock(MessageHandler.class); + private final MessageHandler targetMock1 = Mockito.mock(); - private final MessageHandler targetMock2 = Mockito.mock(MessageHandler.class); + private final MessageHandler targetMock2 = Mockito.mock(); - private final MessageHandler targetMock3 = Mockito.mock(MessageHandler.class); + private final MessageHandler targetMock3 = Mockito.mock(); - @Before + @BeforeEach public void init() { Mockito.reset(taskExecutorMock, messageMock, taskExecutorMock, targetMock1, targetMock2, targetMock3); + given(messageMock.getHeaders()).willReturn(mock()); defaultTaskExecutorMock(); } @Test - public void singleTargetWithoutTaskExecutor() throws Exception { + public void singleTargetWithoutTaskExecutor() { dispatcher = new BroadcastingDispatcher(); dispatcher.addHandler(targetMock1); dispatcher.dispatch(messageMock); @@ -79,7 +82,7 @@ public void singleTargetWithoutTaskExecutor() throws Exception { } @Test - public void singleTargetWithTaskExecutor() throws Exception { + public void singleTargetWithTaskExecutor() { dispatcher = new BroadcastingDispatcher(taskExecutorMock); dispatcher.addHandler(targetMock1); dispatcher.dispatch(messageMock); @@ -202,12 +205,12 @@ public void removeConsumerBetweenSends() { @Test public void applySequenceDisabledByDefault() { BroadcastingDispatcher dispatcher = new BroadcastingDispatcher(); - final List> messages = Collections.synchronizedList(new ArrayList>()); + final List> messages = Collections.synchronizedList(new ArrayList<>()); MessageHandler target1 = new MessageStoringTestEndpoint(messages); MessageHandler target2 = new MessageStoringTestEndpoint(messages); dispatcher.addHandler(target1); dispatcher.addHandler(target2); - dispatcher.dispatch(new GenericMessage("test")); + dispatcher.dispatch(new GenericMessage<>("test")); assertThat(messages.size()).isEqualTo(2); assertThat(new IntegrationMessageHeaderAccessor(messages.get(0)).getSequenceNumber()).isEqualTo(0); assertThat(new IntegrationMessageHeaderAccessor(messages.get(0)).getSequenceSize()).isEqualTo(0); @@ -219,14 +222,14 @@ public void applySequenceDisabledByDefault() { public void applySequenceEnabled() { BroadcastingDispatcher dispatcher = new BroadcastingDispatcher(); dispatcher.setApplySequence(true); - final List> messages = Collections.synchronizedList(new ArrayList>()); + final List> messages = Collections.synchronizedList(new ArrayList<>()); MessageHandler target1 = new MessageStoringTestEndpoint(messages); MessageHandler target2 = new MessageStoringTestEndpoint(messages); MessageHandler target3 = new MessageStoringTestEndpoint(messages); dispatcher.addHandler(target1); dispatcher.addHandler(target2); dispatcher.addHandler(target3); - Message inputMessage = new GenericMessage("test"); + Message inputMessage = new GenericMessage<>("test"); Object originalId = inputMessage.getHeaders().getId(); dispatcher.dispatch(inputMessage); assertThat(messages.size()).isEqualTo(3); @@ -251,13 +254,11 @@ public void testExceptionEnhancement() { dispatcher.addHandler(targetMock1); doThrow(new MessagingException("Mock Exception")) .when(targetMock1).handleMessage(eq(messageMock)); - try { - dispatcher.dispatch(messageMock); - fail("Expected Exception"); - } - catch (MessagingException e) { - assertThat(e.getFailedMessage()).isEqualTo(messageMock); - } + + assertThatExceptionOfType(MessagingException.class) + .isThrownBy(() -> dispatcher.dispatch(messageMock)) + .extracting(MessagingException::getFailedMessage) + .isEqualTo(messageMock); } /** @@ -272,13 +273,11 @@ public void testNoExceptionEnhancement() { Message dontReplaceThisMessage = MessageBuilder.withPayload("x").build(); doThrow(new MessagingException(dontReplaceThisMessage, "Mock Exception")) .when(targetMock1).handleMessage(eq(messageMock)); - try { - dispatcher.dispatch(messageMock); - fail("Expected Exception"); - } - catch (MessagingException e) { - assertThat(e.getFailedMessage()).isEqualTo(dontReplaceThisMessage); - } + + assertThatExceptionOfType(MessagingException.class) + .isThrownBy(() -> dispatcher.dispatch(messageMock)) + .extracting(MessagingException::getFailedMessage) + .isEqualTo(dontReplaceThisMessage); } /** @@ -306,13 +305,10 @@ public void testNoHandlerWithExecutor() { @Test public void testNoHandlerWithRequiredSubscriber() { dispatcher = new BroadcastingDispatcher(true); - try { - dispatcher.dispatch(messageMock); - fail("Expected Exception"); - } - catch (MessageDispatchingException exception) { - assertThat(exception.getFailedMessage()).isEqualTo(messageMock); - } + assertThatExceptionOfType(MessageDispatchingException.class) + .isThrownBy(() -> dispatcher.dispatch(messageMock)) + .extracting(MessagingException::getFailedMessage) + .isEqualTo(messageMock); } /** @@ -322,13 +318,10 @@ public void testNoHandlerWithRequiredSubscriber() { @Test public void testNoHandlerWithExecutorWithRequiredSubscriber() { dispatcher = new BroadcastingDispatcher(taskExecutorMock, true); - try { - dispatcher.dispatch(messageMock); - fail("Expected Exception"); - } - catch (MessageDispatchingException exception) { - assertThat(exception.getFailedMessage()).isEqualTo(messageMock); - } + assertThatExceptionOfType(MessageDispatchingException.class) + .isThrownBy(() -> dispatcher.dispatch(messageMock)) + .extracting(MessagingException::getFailedMessage) + .isEqualTo(messageMock); } private void defaultTaskExecutorMock() { diff --git a/src/reference/antora/modules/ROOT/pages/message-history.adoc b/src/reference/antora/modules/ROOT/pages/message-history.adoc index da6a209e3dd..f159b9653d2 100644 --- a/src/reference/antora/modules/ROOT/pages/message-history.adoc +++ b/src/reference/antora/modules/ROOT/pages/message-history.adoc @@ -129,7 +129,9 @@ The MBean's object name is `:name=messageHistoryConfigurer,type=MessageH IMPORTANT: Only one `@EnableMessageHistory` (or ``) must be declared in the application context as single source for components tracking configuration. Do not use a generic bean definition for the `MessageHistoryConfigurer`. -NOTE: By definition, the message history header is immutable (you cannot re-write history). -Therefore, when writing message history values, the components either create new messages (when the component is an origin) or they copy the history from a request message, modifying it and setting the new list on a reply message. -In either case, the values can be appended even if the message itself is crossing thread boundaries. -That means that the history values can greatly simplify debugging in an asynchronous message flow. +NOTE: Prior to version 6.3, the message history header was immutable (you cannot re-write history): every single track created not only new instance of the `MessageHistory`, but a fully new message copy. +Now it works in append-only mode: the first track creates a new message with a new `MessageHistory` container. +All the rest `MessageHistory.write()` calls add new entries to existing header - and no new message created. +This significantly improves an application performance. +All the components in the framework, where same message can be sent to several consumers (`PublishSubscribeChannel`, `AbstractMessageRouter`, `WireTap` etc.) (or splitter produces several outputs based on input message), are now cloning an existing `MessageHistory` header into those new messages. +For any other multi-producing use-cases, outside the framework scope, the `AbstractIntegrationMessageBuilder.cloneMessageHistoryIfAny()` API is recommended to ensure that parallel downstream sub-flows contribute their own message history traces. \ No newline at end of file diff --git a/src/reference/antora/modules/ROOT/pages/whats-new.adoc b/src/reference/antora/modules/ROOT/pages/whats-new.adoc index 11ff43c9d2d..73400235dd9 100644 --- a/src/reference/antora/modules/ROOT/pages/whats-new.adoc +++ b/src/reference/antora/modules/ROOT/pages/whats-new.adoc @@ -19,6 +19,10 @@ In general the project has been moved to the latest dependency versions. [[x6.3-general]] === General Changes +The `MessageHistory` header is now mutable, append-only container. +And all the subsequent tracks don't create new message - only their entry is added to existing message history header. +See xref:message-history.adoc[Message History Chapter] for more information. + [[x6.3-security-changes]] === Security Support Changes From ffa2c7bb9b08d4ad383ba970a961a3db95835de5 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Tue, 5 Mar 2024 13:11:14 -0500 Subject: [PATCH 2/2] * Do not clone message history header if only one consume in multi-publish * Fix typos in docs --- .../integration/dispatcher/BroadcastingDispatcher.java | 8 ++++++-- .../integration/router/AbstractMessageRouter.java | 8 ++++++-- .../antora/modules/ROOT/pages/message-history.adoc | 4 ++-- 3 files changed, 14 insertions(+), 6 deletions(-) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/BroadcastingDispatcher.java b/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/BroadcastingDispatcher.java index 4e39e2617bd..9f025a2e86d 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/BroadcastingDispatcher.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dispatcher/BroadcastingDispatcher.java @@ -160,7 +160,7 @@ public boolean dispatch(Message message) { Message messageToSend = message; MessageHeaders messageHeaders = message.getHeaders(); UUID correlationKey = messageHeaders.getId(); - boolean hasMessageHistory = messageHeaders.containsKey(MessageHistory.HEADER_NAME); + boolean hasMessageHistory = messageHeaders.containsKey(MessageHistory.HEADER_NAME) && sequenceSize > 1; for (MessageHandler handler : handlers) { if (this.applySequence || hasMessageHistory) { AbstractIntegrationMessageBuilder builder = @@ -173,7 +173,11 @@ public boolean dispatch(Message message) { sequenceNumber++, sequenceSize); } - messageToSend = builder.cloneMessageHistoryIfAny().build(); + if (hasMessageHistory) { + builder.cloneMessageHistoryIfAny(); + } + + messageToSend = builder.build(); } if (message instanceof MessageDecorator messageDecorator) { diff --git a/spring-integration-core/src/main/java/org/springframework/integration/router/AbstractMessageRouter.java b/spring-integration-core/src/main/java/org/springframework/integration/router/AbstractMessageRouter.java index 55116316b4f..6f044c94464 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/router/AbstractMessageRouter.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/router/AbstractMessageRouter.java @@ -197,7 +197,7 @@ protected void handleMessageInternal(Message message) { int sequenceNumber = 1; Message messageToSend = message; UUID correlationKey = message.getHeaders().getId(); - boolean hasMessageHistory = message.getHeaders().containsKey(MessageHistory.HEADER_NAME); + boolean hasMessageHistory = message.getHeaders().containsKey(MessageHistory.HEADER_NAME) && sequenceSize > 1; for (MessageChannel channel : results) { if (this.applySequence || hasMessageHistory) { AbstractIntegrationMessageBuilder builder = @@ -209,7 +209,11 @@ protected void handleMessageInternal(Message message) { sequenceNumber++, sequenceSize); } - messageToSend = builder.cloneMessageHistoryIfAny().build(); + if (hasMessageHistory) { + builder.cloneMessageHistoryIfAny(); + } + + messageToSend = builder.build(); } if (channel != null) { diff --git a/src/reference/antora/modules/ROOT/pages/message-history.adoc b/src/reference/antora/modules/ROOT/pages/message-history.adoc index f159b9653d2..2aea2f51743 100644 --- a/src/reference/antora/modules/ROOT/pages/message-history.adoc +++ b/src/reference/antora/modules/ROOT/pages/message-history.adoc @@ -132,6 +132,6 @@ Do not use a generic bean definition for the `MessageHistoryConfigurer`. NOTE: Prior to version 6.3, the message history header was immutable (you cannot re-write history): every single track created not only new instance of the `MessageHistory`, but a fully new message copy. Now it works in append-only mode: the first track creates a new message with a new `MessageHistory` container. All the rest `MessageHistory.write()` calls add new entries to existing header - and no new message created. -This significantly improves an application performance. -All the components in the framework, where same message can be sent to several consumers (`PublishSubscribeChannel`, `AbstractMessageRouter`, `WireTap` etc.) (or splitter produces several outputs based on input message), are now cloning an existing `MessageHistory` header into those new messages. +This significantly improves the application performance. +All the components in the framework, where same message can be sent to several consumers (`PublishSubscribeChannel`, `AbstractMessageRouter`, `WireTap` etc.), or splitter produces several outputs based on the input message, are now cloning an existing `MessageHistory` header into those new messages. For any other multi-producing use-cases, outside the framework scope, the `AbstractIntegrationMessageBuilder.cloneMessageHistoryIfAny()` API is recommended to ensure that parallel downstream sub-flows contribute their own message history traces. \ No newline at end of file