Skip to content

Commit 055e9a4

Browse files
artembilangaryrussell
authored andcommitted
INT-4457: Add logAndReply() terminal operator
JIRA: https://jira.spring.io/browse/INT-4457 When an `IntegrationFlow` is reply-based (we expect a reply in the beginning) and `log()` (or `wireTap()`) is used in the end, we are forced to add an empty `bridge()` in the end to ensure a `replyChannel` header routing * Fix `IntegrationFlowDefinition` to add `enrichHeaders()` in the end to populate a `nullChannel` as a `replyChannel` header if that is missed in the request message headers. This way we cover both use-cases when we expect reply from the flow and when it is used as a one-way scenario * Improve a `HeaderEnricher` do not create a new `Message` if there are no new headers to add/remove * Remove a note from the `dsl.adoc` about now redundant `bridge()` after `log()` * Resolve TODO in the `.handle()` paragraph * Restore anonymous class in the MessageHistoryIntegrationTests: Mockito can't mock/spy lambdas because they are `final` classes * Introduce `IntegrationFlowDefinition.logAndReply()` operator * Fix logging message Doc Polishing
1 parent 4a85849 commit 055e9a4

File tree

7 files changed

+387
-73
lines changed

7 files changed

+387
-73
lines changed

spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java

Lines changed: 298 additions & 2 deletions
Large diffs are not rendered by default.

spring-integration-core/src/main/java/org/springframework/integration/transformer/HeaderEnricher.java

Lines changed: 38 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -29,11 +29,11 @@
2929
import org.springframework.beans.factory.InitializingBean;
3030
import org.springframework.integration.context.IntegrationObjectSupport;
3131
import org.springframework.integration.handler.MessageProcessor;
32+
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
3233
import org.springframework.integration.support.DefaultMessageBuilderFactory;
3334
import org.springframework.integration.transformer.support.HeaderValueMessageProcessor;
3435
import org.springframework.messaging.Message;
3536
import org.springframework.messaging.MessageHeaders;
36-
import org.springframework.messaging.MessagingException;
3737

3838
/**
3939
* A Transformer that adds statically configured header values to a Message.
@@ -64,12 +64,13 @@ public HeaderEnricher() {
6464

6565
/**
6666
* Create a HeaderEnricher with the given map of headers.
67-
*
6867
* @param headersToAdd The headers to add.
6968
*/
7069
public HeaderEnricher(Map<String, ? extends HeaderValueMessageProcessor<?>> headersToAdd) {
71-
this.headersToAdd = (headersToAdd != null) ? headersToAdd
72-
: new HashMap<String, HeaderValueMessageProcessor<Object>>();
70+
this.headersToAdd =
71+
headersToAdd != null
72+
? headersToAdd
73+
: new HashMap<String, HeaderValueMessageProcessor<Object>>();
7374
}
7475

7576
public <T> void setMessageProcessor(MessageProcessor<T> messageProcessor) {
@@ -86,7 +87,6 @@ public void setDefaultOverwrite(boolean defaultOverwrite) {
8687
* <code>true</code>. Set this to <code>false</code> if a
8788
* <code>null</code> value should trigger <i>removal</i> of the
8889
* corresponding header instead.
89-
*
9090
* @param shouldSkipNulls true when null values should be skipped.
9191
*/
9292
public void setShouldSkipNulls(boolean shouldSkipNulls) {
@@ -96,52 +96,56 @@ public void setShouldSkipNulls(boolean shouldSkipNulls) {
9696

9797
@Override
9898
public String getComponentType() {
99-
return "transformer"; // backwards compatibility
99+
return "header-enricher";
100100
}
101101

102102
@Override
103103
public Message<?> transform(Message<?> message) {
104-
try {
105-
Map<String, Object> headerMap = new HashMap<String, Object>(message.getHeaders());
106-
this.addHeadersFromMessageProcessor(message, headerMap);
107-
for (Map.Entry<String, ? extends HeaderValueMessageProcessor<?>> entry : this.headersToAdd.entrySet()) {
108-
String key = entry.getKey();
109-
HeaderValueMessageProcessor<?> valueProcessor = entry.getValue();
110-
111-
Boolean shouldOverwrite = valueProcessor.isOverwrite();
112-
if (shouldOverwrite == null) {
113-
shouldOverwrite = this.defaultOverwrite;
114-
}
104+
MessageHeaders messageHeaders = message.getHeaders();
115105

116-
boolean headerDoesNotExist = headerMap.get(key) == null;
106+
AbstractIntegrationMessageBuilder<?> messageBuilder =
107+
getMessageBuilderFactory()
108+
.fromMessage(message);
117109

118-
/*
119-
* Only evaluate value expression if necessary
120-
*/
121-
if (headerDoesNotExist || shouldOverwrite) {
122-
Object value = valueProcessor.processMessage(message);
123-
if (value != null || !this.shouldSkipNulls) {
124-
headerMap.put(key, value);
125-
}
110+
addHeadersFromMessageProcessor(message, messageBuilder);
111+
for (Map.Entry<String, ? extends HeaderValueMessageProcessor<?>> entry : this.headersToAdd.entrySet()) {
112+
String key = entry.getKey();
113+
HeaderValueMessageProcessor<?> valueProcessor = entry.getValue();
114+
115+
Boolean shouldOverwrite = valueProcessor.isOverwrite();
116+
if (shouldOverwrite == null) {
117+
shouldOverwrite = this.defaultOverwrite;
118+
}
119+
120+
boolean headerDoesNotExist = messageHeaders.get(key) == null;
121+
122+
/*
123+
* Only evaluate value expression if necessary
124+
*/
125+
if (headerDoesNotExist || shouldOverwrite) {
126+
Object value = valueProcessor.processMessage(message);
127+
if (value != null || !this.shouldSkipNulls) {
128+
messageBuilder.setHeader(key, value);
126129
}
127130
}
128-
return this.getMessageBuilderFactory().withPayload(message.getPayload()).copyHeaders(headerMap).build();
129-
}
130-
catch (Exception e) {
131-
throw new MessagingException(message, "failed to transform message headers", e);
132131
}
132+
133+
return messageBuilder.build();
133134
}
134135

135-
private void addHeadersFromMessageProcessor(Message<?> message, Map<String, Object> headerMap) {
136+
private void addHeadersFromMessageProcessor(Message<?> message,
137+
AbstractIntegrationMessageBuilder<?> messageBuilder) {
138+
136139
if (this.messageProcessor != null) {
137140
Object result = this.messageProcessor.processMessage(message);
138141
if (result instanceof Map) {
142+
MessageHeaders messageHeaders = message.getHeaders();
139143
Map<?, ?> resultMap = (Map<?, ?>) result;
140144
for (Entry<?, ?> entry : resultMap.entrySet()) {
141145
Object key = entry.getKey();
142146
if (key instanceof String) {
143-
if (this.defaultOverwrite || headerMap.get(key) == null) {
144-
headerMap.put((String) key, entry.getValue());
147+
if (this.defaultOverwrite || messageHeaders.get(key) == null) {
148+
messageBuilder.setHeader((String) key, entry.getValue());
145149
}
146150
}
147151
else if (logger.isDebugEnabled()) {

spring-integration-core/src/test/java/org/springframework/integration/dsl/flowservices/FlowServiceTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,8 +133,7 @@ public static class ContextConfiguration {
133133
@Bean
134134
public IntegrationFlow testGateway() {
135135
return f -> f.gateway("processChannel", g -> g.replyChannel("replyChannel"))
136-
.log()
137-
.bridge();
136+
.logAndReply();
138137
}
139138

140139
@Bean

spring-integration-core/src/test/java/org/springframework/integration/dsl/transformers/TransformerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ public IntegrationFlow enricherFlow() {
266266
.propertyFunction("date", m -> new Date())
267267
.headerExpression("foo", "payload['name']")
268268
)
269-
.get();
269+
.logAndReply();
270270
}
271271

272272
@Bean

spring-integration-core/src/test/java/org/springframework/integration/history/MessageHistoryIntegrationTests.java

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -61,7 +61,8 @@ public void testNoHistoryAwareMessageHandler() {
6161
for (ConsumerEndpointFactoryBean cefBean : cefBeans.values()) {
6262
DirectFieldAccessor bridgeAccessor = new DirectFieldAccessor(cefBean);
6363
String handlerClassName = bridgeAccessor.getPropertyValue("handler").getClass().getName();
64-
assertFalse("org.springframework.integration.config.MessageHistoryWritingMessageHandler".equals(handlerClassName));
64+
assertFalse("org.springframework.integration.config.MessageHistoryWritingMessageHandler"
65+
.equals(handlerClassName));
6566
}
6667
ac.close();
6768
}
@@ -72,10 +73,12 @@ public void testMessageHistoryWithHistoryWriter() {
7273
MessageHistoryIntegrationTests.class);
7374
SampleGateway gateway = ac.getBean("sampleGateway", SampleGateway.class);
7475
DirectChannel endOfThePipeChannel = ac.getBean("endOfThePipeChannel", DirectChannel.class);
75-
MessageHandler handler = Mockito.spy(new MessageHandler() {
76+
MessageHandler handler = Mockito.spy(new MessageHandler() { // Not a lambda: Mockito can't mock final classes
77+
7678
@Override
7779
public void handleMessage(Message<?> message) {
78-
Iterator<Properties> historyIterator = message.getHeaders().get(MessageHistory.HEADER_NAME, MessageHistory.class).iterator();
80+
Iterator<Properties> historyIterator = message.getHeaders()
81+
.get(MessageHistory.HEADER_NAME, MessageHistory.class).iterator();
7982

8083
Properties event = historyIterator.next();
8184
assertEquals("sampleGateway", event.getProperty(MessageHistory.NAME_PROPERTY));
@@ -95,7 +98,7 @@ public void handleMessage(Message<?> message) {
9598

9699
event = historyIterator.next();
97100
assertEquals("testHeaderEnricher", event.getProperty(MessageHistory.NAME_PROPERTY));
98-
assertEquals("transformer", event.getProperty(MessageHistory.TYPE_PROPERTY));
101+
assertEquals("header-enricher", event.getProperty(MessageHistory.TYPE_PROPERTY));
99102

100103
event = historyIterator.next();
101104
assertEquals("chainChannel", event.getProperty(MessageHistory.NAME_PROPERTY));
@@ -106,7 +109,8 @@ public void handleMessage(Message<?> message) {
106109
assertEquals("chain", event.getProperty(MessageHistory.TYPE_PROPERTY));
107110

108111
event = historyIterator.next();
109-
assertEquals("sampleChain$child.service-activator-within-chain", event.getProperty(MessageHistory.NAME_PROPERTY));
112+
assertEquals("sampleChain$child.service-activator-within-chain", event
113+
.getProperty(MessageHistory.NAME_PROPERTY));
110114
assertEquals("service-activator", event.getProperty(MessageHistory.TYPE_PROPERTY));
111115

112116
event = historyIterator.next();
@@ -155,7 +159,8 @@ public void testMessageHistoryWithoutHistoryWriter() {
155159
MessageHistoryIntegrationTests.class);
156160
SampleGateway gateway = ac.getBean("sampleGateway", SampleGateway.class);
157161
DirectChannel endOfThePipeChannel = ac.getBean("endOfThePipeChannel", DirectChannel.class);
158-
MessageHandler handler = Mockito.spy(new MessageHandler() {
162+
MessageHandler handler = Mockito.spy(new MessageHandler() { // Not a lambda: Mockito can't mock final classes
163+
159164
@Override
160165
public void handleMessage(Message<?> message) {
161166
assertNull(message.getHeaders().get(MessageHistory.HEADER_NAME, MessageHistory.class));
@@ -175,7 +180,8 @@ public void testMessageHistoryParser() {
175180
"messageHistoryWithHistoryWriterNamespace.xml", MessageHistoryIntegrationTests.class);
176181
SampleGateway gateway = ac.getBean("sampleGateway", SampleGateway.class);
177182
DirectChannel endOfThePipeChannel = ac.getBean("endOfThePipeChannel", DirectChannel.class);
178-
MessageHandler handler = Mockito.spy(new MessageHandler() {
183+
MessageHandler handler = Mockito.spy(new MessageHandler() { // Not a lambda: Mockito can't mock final classes
184+
179185
@Override
180186
public void handleMessage(Message<?> message) {
181187
Iterator<Properties> historyIterator = message.getHeaders()
@@ -197,10 +203,12 @@ public void testMessageHistoryParserWithNamePatterns() {
197203
"messageHistoryWithHistoryWriterNamespaceAndPatterns.xml", MessageHistoryIntegrationTests.class);
198204
SampleGateway gateway = ac.getBean("sampleGateway", SampleGateway.class);
199205
DirectChannel endOfThePipeChannel = ac.getBean("endOfThePipeChannel", DirectChannel.class);
200-
MessageHandler handler = Mockito.spy(new MessageHandler() {
206+
MessageHandler handler = Mockito.spy(new MessageHandler() { // Not a lambda: Mockito can't mock final classes
207+
201208
@Override
202209
public void handleMessage(Message<?> message) {
203-
Iterator<Properties> historyIterator = message.getHeaders().get(MessageHistory.HEADER_NAME, MessageHistory.class).iterator();
210+
Iterator<Properties> historyIterator = message.getHeaders()
211+
.get(MessageHistory.HEADER_NAME, MessageHistory.class).iterator();
204212
assertTrue(historyIterator.hasNext());
205213
Properties gatewayHistory = historyIterator.next();
206214
assertEquals("sampleGateway", gatewayHistory.get("name"));
@@ -224,7 +232,8 @@ public void testMessageHistoryMoreThanOneNamespaceFail() {
224232
MessageHistoryIntegrationTests.class).close();
225233
}
226234

227-
@Test @Ignore
235+
@Test
236+
@Ignore
228237
public void testMessageHistoryWithHistoryPerformance() {
229238
ConfigurableApplicationContext acWithHistory = new ClassPathXmlApplicationContext("perfWithMessageHistory.xml",
230239
MessageHistoryIntegrationTests.class);
@@ -264,7 +273,9 @@ public void testMessageHistoryWithHistoryPerformance() {
264273
}
265274

266275
public interface SampleGateway {
276+
267277
Message<?> echo(String value);
278+
268279
}
269280

270281
}

src/reference/asciidoc/dsl.adoc

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -511,14 +511,31 @@ public IntegrationFlow integerFlow() {
511511
----
512512
====
513513

514-
We also register a `BytesToIntegerConverter` within `ConversionService` to get rid of that additional `.transform()`.
515-
// TODO We don't show how to register a `BytesToIntegerConverter` within `ConversionService` to get rid of that additional `.transform()`.
514+
We also can register some `BytesToIntegerConverter` within `ConversionService` to get rid of that additional `.transform()`:
515+
516+
====
517+
[source,java]
518+
----
519+
@Bean
520+
@IntegrationConverter
521+
public BytesToIntegerConverter bytesToIntegerConverter() {
522+
return new BytesToIntegerConverter();
523+
}
524+
525+
@Bean
526+
public IntegrationFlow integerFlow() {
527+
return IntegrationFlows.from("input")
528+
.handle(Integer.class, (p, h) -> p * 2)
529+
.get();
530+
}
531+
----
532+
====
516533

517534
[[java-dsl-log]]
518535
=== Operator log()
519536

520537
For convenience, to log the message journey through the Spring Integration flow (`<logging-channel-adapter>`), a `log()` operator is presented.
521-
Internally, it is represented by the `WireTap` `ChannelInterceptor` and `LoggingHandler` as subscriber.
538+
Internally, it is represented by the `WireTap` `ChannelInterceptor` with a `LoggingHandler` as its subscriber.
522539
It is responsible for logging the incoming message into the next endpoint or the current channel.
523540
The following example shows how to use `LoggingHandler`:
524541

@@ -533,6 +550,10 @@ The following example shows how to use `LoggingHandler`:
533550

534551
In the preceding example, an `id` header is logged at the `ERROR` level onto `test.category` only for messages that passed the filter and before routing.
535552

553+
When this operator is used at the end of a flow, it is a one-way handler and the flow ends.
554+
To make it as a reply-producing flow, you can either use a simple `bridge()` after the `log()` or, starting with version 5.1, you can use a `logAndReply()` operator instead.
555+
`logAndReply` can only be used at the end of a flow.
556+
536557
[[java-dsl-wiretap]]
537558
=== `MessageChannelSpec.wireTap()`
538559

@@ -589,26 +610,6 @@ The following example does not have any channel declaration:
589610

590611
In the preceding example (and any time no channel has been declared), an implicit `DirectChannel` is injected in the current position of the `IntegrationFlow` and used as an output channel for the currently configured `ServiceActivatingHandler` (from the `.handle()`, <<java-dsl-handle,described earlier>>).
591612

592-
[IMPORTANT]
593-
====
594-
If `log()` or `wireTap()` are used in the end of the flow, they are considered to be one-way `MessageHandler` instances.
595-
If you expect the integration flow to return a reply, you should add a `bridge()` should to the end, after `log()` or `wireTap()`, as the following example shows:
596-
597-
[source,java]
598-
----
599-
@Bean
600-
public IntegrationFlow sseFlow() {
601-
return IntegrationFlows
602-
.from(WebFlux.inboundGateway("/sse")
603-
.requestMapping(m ->
604-
m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
605-
.handle((p, h) -> Flux.just("foo", "bar", "baz"))
606-
.log(LoggingHandler.Level.WARN)
607-
.bridge()
608-
.get();
609-
}
610-
----
611-
====
612613

613614
[[java-dsl-flows]]
614615
=== Working With Message Flows

src/reference/asciidoc/whats-new.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ The following changes have been made in version 5.1:
3636

3737
The `IntegrationFlowContext` is now an interface and `IntegrationFlowRegistration` is an inner interface of `IntegrationFlowContext`.
3838

39+
A new `logAndReply()` operator has been introduced for convenience when you wish to log at the end of a flow for request-reply configurations.
40+
This avoid confusion with `log()` which is treated as a one-way end flow component.
41+
3942
[[x5.1-dispatcher-exceptions]]
4043
==== Dispatcher Exceptions
4144

0 commit comments

Comments
 (0)