Skip to content

INT-4457: Make log() in the end as reply-aware #2535

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,11 +29,11 @@
import org.springframework.beans.factory.InitializingBean;
import org.springframework.integration.context.IntegrationObjectSupport;
import org.springframework.integration.handler.MessageProcessor;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.integration.support.DefaultMessageBuilderFactory;
import org.springframework.integration.transformer.support.HeaderValueMessageProcessor;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;

/**
* A Transformer that adds statically configured header values to a Message.
Expand Down Expand Up @@ -64,12 +64,13 @@ public HeaderEnricher() {

/**
* Create a HeaderEnricher with the given map of headers.
*
* @param headersToAdd The headers to add.
*/
public HeaderEnricher(Map<String, ? extends HeaderValueMessageProcessor<?>> headersToAdd) {
this.headersToAdd = (headersToAdd != null) ? headersToAdd
: new HashMap<String, HeaderValueMessageProcessor<Object>>();
this.headersToAdd =
headersToAdd != null
? headersToAdd
: new HashMap<String, HeaderValueMessageProcessor<Object>>();
}

public <T> void setMessageProcessor(MessageProcessor<T> messageProcessor) {
Expand All @@ -86,7 +87,6 @@ public void setDefaultOverwrite(boolean defaultOverwrite) {
* <code>true</code>. Set this to <code>false</code> if a
* <code>null</code> value should trigger <i>removal</i> of the
* corresponding header instead.
*
* @param shouldSkipNulls true when null values should be skipped.
*/
public void setShouldSkipNulls(boolean shouldSkipNulls) {
Expand All @@ -96,52 +96,56 @@ public void setShouldSkipNulls(boolean shouldSkipNulls) {

@Override
public String getComponentType() {
return "transformer"; // backwards compatibility
return "header-enricher";
}

@Override
public Message<?> transform(Message<?> message) {
try {
Map<String, Object> headerMap = new HashMap<String, Object>(message.getHeaders());
this.addHeadersFromMessageProcessor(message, headerMap);
for (Map.Entry<String, ? extends HeaderValueMessageProcessor<?>> entry : this.headersToAdd.entrySet()) {
String key = entry.getKey();
HeaderValueMessageProcessor<?> valueProcessor = entry.getValue();

Boolean shouldOverwrite = valueProcessor.isOverwrite();
if (shouldOverwrite == null) {
shouldOverwrite = this.defaultOverwrite;
}
MessageHeaders messageHeaders = message.getHeaders();

boolean headerDoesNotExist = headerMap.get(key) == null;
AbstractIntegrationMessageBuilder<?> messageBuilder =
getMessageBuilderFactory()
.fromMessage(message);

/*
* Only evaluate value expression if necessary
*/
if (headerDoesNotExist || shouldOverwrite) {
Object value = valueProcessor.processMessage(message);
if (value != null || !this.shouldSkipNulls) {
headerMap.put(key, value);
}
addHeadersFromMessageProcessor(message, messageBuilder);
for (Map.Entry<String, ? extends HeaderValueMessageProcessor<?>> entry : this.headersToAdd.entrySet()) {
String key = entry.getKey();
HeaderValueMessageProcessor<?> valueProcessor = entry.getValue();

Boolean shouldOverwrite = valueProcessor.isOverwrite();
if (shouldOverwrite == null) {
shouldOverwrite = this.defaultOverwrite;
}

boolean headerDoesNotExist = messageHeaders.get(key) == null;

/*
* Only evaluate value expression if necessary
*/
if (headerDoesNotExist || shouldOverwrite) {
Object value = valueProcessor.processMessage(message);
if (value != null || !this.shouldSkipNulls) {
messageBuilder.setHeader(key, value);
}
}
return this.getMessageBuilderFactory().withPayload(message.getPayload()).copyHeaders(headerMap).build();
}
catch (Exception e) {
throw new MessagingException(message, "failed to transform message headers", e);
}

return messageBuilder.build();
}

private void addHeadersFromMessageProcessor(Message<?> message, Map<String, Object> headerMap) {
private void addHeadersFromMessageProcessor(Message<?> message,
AbstractIntegrationMessageBuilder<?> messageBuilder) {

if (this.messageProcessor != null) {
Object result = this.messageProcessor.processMessage(message);
if (result instanceof Map) {
MessageHeaders messageHeaders = message.getHeaders();
Map<?, ?> resultMap = (Map<?, ?>) result;
for (Entry<?, ?> entry : resultMap.entrySet()) {
Object key = entry.getKey();
if (key instanceof String) {
if (this.defaultOverwrite || headerMap.get(key) == null) {
headerMap.put((String) key, entry.getValue());
if (this.defaultOverwrite || messageHeaders.get(key) == null) {
messageBuilder.setHeader((String) key, entry.getValue());
}
}
else if (logger.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,7 @@ public static class ContextConfiguration {
@Bean
public IntegrationFlow testGateway() {
return f -> f.gateway("processChannel", g -> g.replyChannel("replyChannel"))
.log()
.bridge();
.logAndReply();
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ public IntegrationFlow enricherFlow() {
.propertyFunction("date", m -> new Date())
.headerExpression("foo", "payload['name']")
)
.get();
.logAndReply();
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -61,7 +61,8 @@ public void testNoHistoryAwareMessageHandler() {
for (ConsumerEndpointFactoryBean cefBean : cefBeans.values()) {
DirectFieldAccessor bridgeAccessor = new DirectFieldAccessor(cefBean);
String handlerClassName = bridgeAccessor.getPropertyValue("handler").getClass().getName();
assertFalse("org.springframework.integration.config.MessageHistoryWritingMessageHandler".equals(handlerClassName));
assertFalse("org.springframework.integration.config.MessageHistoryWritingMessageHandler"
.equals(handlerClassName));
}
ac.close();
}
Expand All @@ -72,10 +73,12 @@ public void testMessageHistoryWithHistoryWriter() {
MessageHistoryIntegrationTests.class);
SampleGateway gateway = ac.getBean("sampleGateway", SampleGateway.class);
DirectChannel endOfThePipeChannel = ac.getBean("endOfThePipeChannel", DirectChannel.class);
MessageHandler handler = Mockito.spy(new MessageHandler() {
MessageHandler handler = Mockito.spy(new MessageHandler() { // Not a lambda: Mockito can't mock final classes

@Override
public void handleMessage(Message<?> message) {
Iterator<Properties> historyIterator = message.getHeaders().get(MessageHistory.HEADER_NAME, MessageHistory.class).iterator();
Iterator<Properties> historyIterator = message.getHeaders()
.get(MessageHistory.HEADER_NAME, MessageHistory.class).iterator();

Properties event = historyIterator.next();
assertEquals("sampleGateway", event.getProperty(MessageHistory.NAME_PROPERTY));
Expand All @@ -95,7 +98,7 @@ public void handleMessage(Message<?> message) {

event = historyIterator.next();
assertEquals("testHeaderEnricher", event.getProperty(MessageHistory.NAME_PROPERTY));
assertEquals("transformer", event.getProperty(MessageHistory.TYPE_PROPERTY));
assertEquals("header-enricher", event.getProperty(MessageHistory.TYPE_PROPERTY));

event = historyIterator.next();
assertEquals("chainChannel", event.getProperty(MessageHistory.NAME_PROPERTY));
Expand All @@ -106,7 +109,8 @@ public void handleMessage(Message<?> message) {
assertEquals("chain", event.getProperty(MessageHistory.TYPE_PROPERTY));

event = historyIterator.next();
assertEquals("sampleChain$child.service-activator-within-chain", event.getProperty(MessageHistory.NAME_PROPERTY));
assertEquals("sampleChain$child.service-activator-within-chain", event
.getProperty(MessageHistory.NAME_PROPERTY));
assertEquals("service-activator", event.getProperty(MessageHistory.TYPE_PROPERTY));

event = historyIterator.next();
Expand Down Expand Up @@ -155,7 +159,8 @@ public void testMessageHistoryWithoutHistoryWriter() {
MessageHistoryIntegrationTests.class);
SampleGateway gateway = ac.getBean("sampleGateway", SampleGateway.class);
DirectChannel endOfThePipeChannel = ac.getBean("endOfThePipeChannel", DirectChannel.class);
MessageHandler handler = Mockito.spy(new MessageHandler() {
MessageHandler handler = Mockito.spy(new MessageHandler() { // Not a lambda: Mockito can't mock final classes

@Override
public void handleMessage(Message<?> message) {
assertNull(message.getHeaders().get(MessageHistory.HEADER_NAME, MessageHistory.class));
Expand All @@ -175,7 +180,8 @@ public void testMessageHistoryParser() {
"messageHistoryWithHistoryWriterNamespace.xml", MessageHistoryIntegrationTests.class);
SampleGateway gateway = ac.getBean("sampleGateway", SampleGateway.class);
DirectChannel endOfThePipeChannel = ac.getBean("endOfThePipeChannel", DirectChannel.class);
MessageHandler handler = Mockito.spy(new MessageHandler() {
MessageHandler handler = Mockito.spy(new MessageHandler() { // Not a lambda: Mockito can't mock final classes

@Override
public void handleMessage(Message<?> message) {
Iterator<Properties> historyIterator = message.getHeaders()
Expand All @@ -197,10 +203,12 @@ public void testMessageHistoryParserWithNamePatterns() {
"messageHistoryWithHistoryWriterNamespaceAndPatterns.xml", MessageHistoryIntegrationTests.class);
SampleGateway gateway = ac.getBean("sampleGateway", SampleGateway.class);
DirectChannel endOfThePipeChannel = ac.getBean("endOfThePipeChannel", DirectChannel.class);
MessageHandler handler = Mockito.spy(new MessageHandler() {
MessageHandler handler = Mockito.spy(new MessageHandler() { // Not a lambda: Mockito can't mock final classes

@Override
public void handleMessage(Message<?> message) {
Iterator<Properties> historyIterator = message.getHeaders().get(MessageHistory.HEADER_NAME, MessageHistory.class).iterator();
Iterator<Properties> historyIterator = message.getHeaders()
.get(MessageHistory.HEADER_NAME, MessageHistory.class).iterator();
assertTrue(historyIterator.hasNext());
Properties gatewayHistory = historyIterator.next();
assertEquals("sampleGateway", gatewayHistory.get("name"));
Expand All @@ -224,7 +232,8 @@ public void testMessageHistoryMoreThanOneNamespaceFail() {
MessageHistoryIntegrationTests.class).close();
}

@Test @Ignore
@Test
@Ignore
public void testMessageHistoryWithHistoryPerformance() {
ConfigurableApplicationContext acWithHistory = new ClassPathXmlApplicationContext("perfWithMessageHistory.xml",
MessageHistoryIntegrationTests.class);
Expand Down Expand Up @@ -264,7 +273,9 @@ public void testMessageHistoryWithHistoryPerformance() {
}

public interface SampleGateway {

Message<?> echo(String value);

}

}
45 changes: 23 additions & 22 deletions src/reference/asciidoc/dsl.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -511,8 +511,25 @@ public IntegrationFlow integerFlow() {
----
====

We also register a `BytesToIntegerConverter` within `ConversionService` to get rid of that additional `.transform()`.
// TODO We don't show how to register a `BytesToIntegerConverter` within `ConversionService` to get rid of that additional `.transform()`.
We also can register some `BytesToIntegerConverter` within `ConversionService` to get rid of that additional `.transform()`:

====
[source,java]
----
@Bean
@IntegrationConverter
public BytesToIntegerConverter bytesToIntegerConverter() {
return new BytesToIntegerConverter();
}

@Bean
public IntegrationFlow integerFlow() {
return IntegrationFlows.from("input")
.handle(Integer.class, (p, h) -> p * 2)
.get();
}
----
====

[[java-dsl-log]]
=== Operator log()
Expand All @@ -533,6 +550,10 @@ The following example shows how to use `LoggingHandler`:

In the preceding example, an `id` header is logged at the `ERROR` level onto `test.category` only for messages that passed the filter and before routing.

When this operator is used in the end of flow, it is considered as a one-way handler and and treated as the end of flow.
To make it as a reply-producing flow, you need to use a simple `bridge()` after or just use introduced in version 5.1 a `logAndReply()` operator.
This one can be used only in the end of flow.

[[java-dsl-wiretap]]
=== `MessageChannelSpec.wireTap()`

Expand Down Expand Up @@ -589,26 +610,6 @@ The following example does not have any channel declaration:

In the preceding example (and any time no channel has been declared), an implicit `DirectChannel` is injected in the current position of the `IntegrationFlow` and used as an output channel for the currently configured `ServiceActivatingHandler` (from the `.handle()`, <<java-dsl-handle,described earlier>>).

[IMPORTANT]
====
If `log()` or `wireTap()` are used in the end of the flow, they are considered to be one-way `MessageHandler` instances.
If you expect the integration flow to return a reply, you should add a `bridge()` should to the end, after `log()` or `wireTap()`, as the following example shows:

[source,java]
----
@Bean
public IntegrationFlow sseFlow() {
return IntegrationFlows
.from(WebFlux.inboundGateway("/sse")
.requestMapping(m ->
m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
.handle((p, h) -> Flux.just("foo", "bar", "baz"))
.log(LoggingHandler.Level.WARN)
.bridge()
.get();
}
----
====

[[java-dsl-flows]]
=== Working With Message Flows
Expand Down
2 changes: 2 additions & 0 deletions src/reference/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ The following changes have been made in version 5.1:

The `IntegrationFlowContext` is now an interface and `IntegrationFlowRegistration` is an inner interface of `IntegrationFlowContext`.

A new `logAndReply()` operator has been introduced for convenience in the end of flow for request-reply configurations to avoid confusion with the `log()` which is treated as one-way end flow component.

[[x5.1-dispatcher-exceptions]]
==== Dispatcher Exceptions

Expand Down