Skip to content

Commit 70e1287

Browse files
committed
INT-4457: Make log() in the end as reply-aware
JIRA: https://jira.spring.io/browse/INT-4457 When an `IntegrationFlow` is reply-based (we expect a reply in the beginning) and `log()` (or `wireTap()`) is used in the end, we are forced to add an empty `bridge()` in the end to ensure a `replyChannel` header routing * Fix `IntegrationFlowDefinition` to add `enrichHeaders()` in the end to populate a `nullChannel` as a `replyChannel` header if that is missed in the request message headers. This way we cover both use-cases when we expect reply from the flow and when it is used as a one-way scenario * Improve a `HeaderEnricher` do not create a new `Message` if there are no new headers to add/remove * Remove a note from the `dsl.adoc` about now redundant `bridge()` after `log()` * Resolve TODO in the `.handle()` paragraph
1 parent 34ac66d commit 70e1287

File tree

5 files changed

+161
-160
lines changed

5 files changed

+161
-160
lines changed

spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.integration.dsl;
1818

19+
import java.util.Collections;
1920
import java.util.HashSet;
2021
import java.util.LinkedHashMap;
2122
import java.util.Map;
@@ -92,6 +93,7 @@
9293
import org.springframework.messaging.Message;
9394
import org.springframework.messaging.MessageChannel;
9495
import org.springframework.messaging.MessageHandler;
96+
import org.springframework.messaging.MessageHeaders;
9597
import org.springframework.util.Assert;
9698
import org.springframework.util.CollectionUtils;
9799
import org.springframework.util.StringUtils;
@@ -2740,7 +2742,9 @@ else if (this.currentMessageChannel != null) {
27402742
.stream()
27412743
.reduce((first, second) -> second);
27422744
if (lastComponent.get() instanceof WireTapSpec) {
2743-
channel(IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME);
2745+
enrichHeaders(
2746+
Collections.singletonMap(
2747+
MessageHeaders.REPLY_CHANNEL, IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME));
27442748
}
27452749
}
27462750

spring-integration-core/src/main/java/org/springframework/integration/transformer/HeaderEnricher.java

Lines changed: 38 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -29,11 +29,11 @@
2929
import org.springframework.beans.factory.InitializingBean;
3030
import org.springframework.integration.context.IntegrationObjectSupport;
3131
import org.springframework.integration.handler.MessageProcessor;
32+
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
3233
import org.springframework.integration.support.DefaultMessageBuilderFactory;
3334
import org.springframework.integration.transformer.support.HeaderValueMessageProcessor;
3435
import org.springframework.messaging.Message;
3536
import org.springframework.messaging.MessageHeaders;
36-
import org.springframework.messaging.MessagingException;
3737

3838
/**
3939
* A Transformer that adds statically configured header values to a Message.
@@ -64,12 +64,13 @@ public HeaderEnricher() {
6464

6565
/**
6666
* Create a HeaderEnricher with the given map of headers.
67-
*
6867
* @param headersToAdd The headers to add.
6968
*/
7069
public HeaderEnricher(Map<String, ? extends HeaderValueMessageProcessor<?>> headersToAdd) {
71-
this.headersToAdd = (headersToAdd != null) ? headersToAdd
72-
: new HashMap<String, HeaderValueMessageProcessor<Object>>();
70+
this.headersToAdd =
71+
headersToAdd != null
72+
? headersToAdd
73+
: new HashMap<String, HeaderValueMessageProcessor<Object>>();
7374
}
7475

7576
public <T> void setMessageProcessor(MessageProcessor<T> messageProcessor) {
@@ -86,7 +87,6 @@ public void setDefaultOverwrite(boolean defaultOverwrite) {
8687
* <code>true</code>. Set this to <code>false</code> if a
8788
* <code>null</code> value should trigger <i>removal</i> of the
8889
* corresponding header instead.
89-
*
9090
* @param shouldSkipNulls true when null values should be skipped.
9191
*/
9292
public void setShouldSkipNulls(boolean shouldSkipNulls) {
@@ -96,52 +96,56 @@ public void setShouldSkipNulls(boolean shouldSkipNulls) {
9696

9797
@Override
9898
public String getComponentType() {
99-
return "transformer"; // backwards compatibility
99+
return "header-enricher";
100100
}
101101

102102
@Override
103103
public Message<?> transform(Message<?> message) {
104-
try {
105-
Map<String, Object> headerMap = new HashMap<String, Object>(message.getHeaders());
106-
this.addHeadersFromMessageProcessor(message, headerMap);
107-
for (Map.Entry<String, ? extends HeaderValueMessageProcessor<?>> entry : this.headersToAdd.entrySet()) {
108-
String key = entry.getKey();
109-
HeaderValueMessageProcessor<?> valueProcessor = entry.getValue();
110-
111-
Boolean shouldOverwrite = valueProcessor.isOverwrite();
112-
if (shouldOverwrite == null) {
113-
shouldOverwrite = this.defaultOverwrite;
114-
}
104+
MessageHeaders messageHeaders = message.getHeaders();
115105

116-
boolean headerDoesNotExist = headerMap.get(key) == null;
106+
AbstractIntegrationMessageBuilder<?> messageBuilder =
107+
getMessageBuilderFactory()
108+
.fromMessage(message);
117109

118-
/*
119-
* Only evaluate value expression if necessary
120-
*/
121-
if (headerDoesNotExist || shouldOverwrite) {
122-
Object value = valueProcessor.processMessage(message);
123-
if (value != null || !this.shouldSkipNulls) {
124-
headerMap.put(key, value);
125-
}
110+
addHeadersFromMessageProcessor(message, messageBuilder);
111+
for (Map.Entry<String, ? extends HeaderValueMessageProcessor<?>> entry : this.headersToAdd.entrySet()) {
112+
String key = entry.getKey();
113+
HeaderValueMessageProcessor<?> valueProcessor = entry.getValue();
114+
115+
Boolean shouldOverwrite = valueProcessor.isOverwrite();
116+
if (shouldOverwrite == null) {
117+
shouldOverwrite = this.defaultOverwrite;
118+
}
119+
120+
boolean headerDoesNotExist = messageHeaders.get(key) == null;
121+
122+
/*
123+
* Only evaluate value expression if necessary
124+
*/
125+
if (headerDoesNotExist || shouldOverwrite) {
126+
Object value = valueProcessor.processMessage(message);
127+
if (value != null || !this.shouldSkipNulls) {
128+
messageBuilder.setHeader(key, value);
126129
}
127130
}
128-
return this.getMessageBuilderFactory().withPayload(message.getPayload()).copyHeaders(headerMap).build();
129-
}
130-
catch (Exception e) {
131-
throw new MessagingException(message, "failed to transform message headers", e);
132131
}
132+
133+
return messageBuilder.build();
133134
}
134135

135-
private void addHeadersFromMessageProcessor(Message<?> message, Map<String, Object> headerMap) {
136+
private void addHeadersFromMessageProcessor(Message<?> message,
137+
AbstractIntegrationMessageBuilder<?> messageBuilder) {
138+
136139
if (this.messageProcessor != null) {
137140
Object result = this.messageProcessor.processMessage(message);
138141
if (result instanceof Map) {
142+
MessageHeaders messageHeaders = message.getHeaders();
139143
Map<?, ?> resultMap = (Map<?, ?>) result;
140144
for (Entry<?, ?> entry : resultMap.entrySet()) {
141145
Object key = entry.getKey();
142146
if (key instanceof String) {
143-
if (this.defaultOverwrite || headerMap.get(key) == null) {
144-
headerMap.put((String) key, entry.getValue());
147+
if (this.defaultOverwrite || messageHeaders.get(key) == null) {
148+
messageBuilder.setHeader((String) key, entry.getValue());
145149
}
146150
}
147151
else if (logger.isDebugEnabled()) {

spring-integration-core/src/test/java/org/springframework/integration/dsl/flowservices/FlowServiceTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,8 +133,7 @@ public static class ContextConfiguration {
133133
@Bean
134134
public IntegrationFlow testGateway() {
135135
return f -> f.gateway("processChannel", g -> g.replyChannel("replyChannel"))
136-
.log()
137-
.bridge();
136+
.log();
138137
}
139138

140139
@Bean

0 commit comments

Comments
 (0)