Skip to content

INT-4457: Make log() in the end as reply-aware #2535

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.springframework.integration.dsl;

import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
Expand Down Expand Up @@ -92,6 +93,7 @@
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
Expand Down Expand Up @@ -2740,7 +2742,9 @@ else if (this.currentMessageChannel != null) {
.stream()
.reduce((first, second) -> second);
if (lastComponent.get() instanceof WireTapSpec) {
channel(IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME);
enrichHeaders(
Collections.singletonMap(
MessageHeaders.REPLY_CHANNEL, IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably add a comment here to explain why - I will do it when merging.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah... Makes sense.
This is really a cool trick since we did a replyChannel resolution against a reply message as fallback.
So, this way the HeaderEnricher will be able to produce its reply to that nullChannel only if there is no a replyChannel in the request message.

However I'm starting to think that this should be something like new NullChannel() - definitely an internal matter and free from any management and metrics.

WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm... while writing the comment, it made me think that we have a problem here...

This logic will mask a "lost reply channel mistake".

Right now if we have

.from(gateway)
.transform() // this transformer has a bug - removes the replyChannel header.
.wiretap()
.bridge()
.get()

We'll get the infamous no output-channel or replyChannel header available error and quickly determine the problem.

With this change, we'll no longer get the error and the gateway will hang or timeout.

I think this might cause us a bunch of Stack Overflow grief.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean.

To choices: close as "Works as Designed" or spend more time to think what could we do. For example to figure out some how how to intercept a replyChannel evaluated at runtime...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a couple of ideas:

  1. Detect that the flow starts with a gateway and automatically add a bridge to nowhere - I rejected this because someone could add a replyChannel header mid-flow.
  2. Add a new property to the wireTap and log specs - something like .log().andReply(), which would only be allowed on the last component.

But WAD works for me too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess .andReply() would just be a synonym for bridge() - but maybe semantically more pleasing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, the flow might not start from the gateway, but simple channel.
That channel can be used for sending from the one-way scenario, as well as from the gateway one, on the other hand.
So, this our flow is not aware of the upstream in advance.

I believe we can restore code because it is stable and just document how to be in such a double case as long as the replyChannel propagation is supplied properly during the flow...

Another option do not allow to register log/wire-tap in the end. As far as it is a channel interceptor there really should be an channel to apply to...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about just logAndReply() which is indeed a synonym for the log().bridge() and represent's no-op when is used in the middle?

The double situation mentioned above is fully not related to the story and should be revised from the target application perspective.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logAndReply() sounds good - couldn't we detect it's used mid-flow by checking the previous component?

}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,11 +29,11 @@
import org.springframework.beans.factory.InitializingBean;
import org.springframework.integration.context.IntegrationObjectSupport;
import org.springframework.integration.handler.MessageProcessor;
import org.springframework.integration.support.AbstractIntegrationMessageBuilder;
import org.springframework.integration.support.DefaultMessageBuilderFactory;
import org.springframework.integration.transformer.support.HeaderValueMessageProcessor;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;

/**
* A Transformer that adds statically configured header values to a Message.
Expand Down Expand Up @@ -64,12 +64,13 @@ public HeaderEnricher() {

/**
* Create a HeaderEnricher with the given map of headers.
*
* @param headersToAdd The headers to add.
*/
public HeaderEnricher(Map<String, ? extends HeaderValueMessageProcessor<?>> headersToAdd) {
this.headersToAdd = (headersToAdd != null) ? headersToAdd
: new HashMap<String, HeaderValueMessageProcessor<Object>>();
this.headersToAdd =
headersToAdd != null
? headersToAdd
: new HashMap<String, HeaderValueMessageProcessor<Object>>();
}

public <T> void setMessageProcessor(MessageProcessor<T> messageProcessor) {
Expand All @@ -86,7 +87,6 @@ public void setDefaultOverwrite(boolean defaultOverwrite) {
* <code>true</code>. Set this to <code>false</code> if a
* <code>null</code> value should trigger <i>removal</i> of the
* corresponding header instead.
*
* @param shouldSkipNulls true when null values should be skipped.
*/
public void setShouldSkipNulls(boolean shouldSkipNulls) {
Expand All @@ -96,52 +96,56 @@ public void setShouldSkipNulls(boolean shouldSkipNulls) {

@Override
public String getComponentType() {
return "transformer"; // backwards compatibility
return "header-enricher";
}

@Override
public Message<?> transform(Message<?> message) {
try {
Map<String, Object> headerMap = new HashMap<String, Object>(message.getHeaders());
this.addHeadersFromMessageProcessor(message, headerMap);
for (Map.Entry<String, ? extends HeaderValueMessageProcessor<?>> entry : this.headersToAdd.entrySet()) {
String key = entry.getKey();
HeaderValueMessageProcessor<?> valueProcessor = entry.getValue();

Boolean shouldOverwrite = valueProcessor.isOverwrite();
if (shouldOverwrite == null) {
shouldOverwrite = this.defaultOverwrite;
}
MessageHeaders messageHeaders = message.getHeaders();

boolean headerDoesNotExist = headerMap.get(key) == null;
AbstractIntegrationMessageBuilder<?> messageBuilder =
getMessageBuilderFactory()
.fromMessage(message);

/*
* Only evaluate value expression if necessary
*/
if (headerDoesNotExist || shouldOverwrite) {
Object value = valueProcessor.processMessage(message);
if (value != null || !this.shouldSkipNulls) {
headerMap.put(key, value);
}
addHeadersFromMessageProcessor(message, messageBuilder);
for (Map.Entry<String, ? extends HeaderValueMessageProcessor<?>> entry : this.headersToAdd.entrySet()) {
String key = entry.getKey();
HeaderValueMessageProcessor<?> valueProcessor = entry.getValue();

Boolean shouldOverwrite = valueProcessor.isOverwrite();
if (shouldOverwrite == null) {
shouldOverwrite = this.defaultOverwrite;
}

boolean headerDoesNotExist = messageHeaders.get(key) == null;

/*
* Only evaluate value expression if necessary
*/
if (headerDoesNotExist || shouldOverwrite) {
Object value = valueProcessor.processMessage(message);
if (value != null || !this.shouldSkipNulls) {
messageBuilder.setHeader(key, value);
}
}
return this.getMessageBuilderFactory().withPayload(message.getPayload()).copyHeaders(headerMap).build();
}
catch (Exception e) {
throw new MessagingException(message, "failed to transform message headers", e);
}

return messageBuilder.build();
}

private void addHeadersFromMessageProcessor(Message<?> message, Map<String, Object> headerMap) {
private void addHeadersFromMessageProcessor(Message<?> message,
AbstractIntegrationMessageBuilder<?> messageBuilder) {

if (this.messageProcessor != null) {
Object result = this.messageProcessor.processMessage(message);
if (result instanceof Map) {
MessageHeaders messageHeaders = message.getHeaders();
Map<?, ?> resultMap = (Map<?, ?>) result;
for (Entry<?, ?> entry : resultMap.entrySet()) {
Object key = entry.getKey();
if (key instanceof String) {
if (this.defaultOverwrite || headerMap.get(key) == null) {
headerMap.put((String) key, entry.getValue());
if (this.defaultOverwrite || messageHeaders.get(key) == null) {
messageBuilder.setHeader((String) key, entry.getValue());
}
}
else if (logger.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,7 @@ public static class ContextConfiguration {
@Bean
public IntegrationFlow testGateway() {
return f -> f.gateway("processChannel", g -> g.replyChannel("replyChannel"))
.log()
.bridge();
.log();
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -61,7 +61,8 @@ public void testNoHistoryAwareMessageHandler() {
for (ConsumerEndpointFactoryBean cefBean : cefBeans.values()) {
DirectFieldAccessor bridgeAccessor = new DirectFieldAccessor(cefBean);
String handlerClassName = bridgeAccessor.getPropertyValue("handler").getClass().getName();
assertFalse("org.springframework.integration.config.MessageHistoryWritingMessageHandler".equals(handlerClassName));
assertFalse("org.springframework.integration.config.MessageHistoryWritingMessageHandler"
.equals(handlerClassName));
}
ac.close();
}
Expand All @@ -72,10 +73,12 @@ public void testMessageHistoryWithHistoryWriter() {
MessageHistoryIntegrationTests.class);
SampleGateway gateway = ac.getBean("sampleGateway", SampleGateway.class);
DirectChannel endOfThePipeChannel = ac.getBean("endOfThePipeChannel", DirectChannel.class);
MessageHandler handler = Mockito.spy(new MessageHandler() {
MessageHandler handler = Mockito.spy(new MessageHandler() { // Not a lambda: Mockito can't mock final classes

@Override
public void handleMessage(Message<?> message) {
Iterator<Properties> historyIterator = message.getHeaders().get(MessageHistory.HEADER_NAME, MessageHistory.class).iterator();
Iterator<Properties> historyIterator = message.getHeaders()
.get(MessageHistory.HEADER_NAME, MessageHistory.class).iterator();

Properties event = historyIterator.next();
assertEquals("sampleGateway", event.getProperty(MessageHistory.NAME_PROPERTY));
Expand All @@ -95,7 +98,7 @@ public void handleMessage(Message<?> message) {

event = historyIterator.next();
assertEquals("testHeaderEnricher", event.getProperty(MessageHistory.NAME_PROPERTY));
assertEquals("transformer", event.getProperty(MessageHistory.TYPE_PROPERTY));
assertEquals("header-enricher", event.getProperty(MessageHistory.TYPE_PROPERTY));

event = historyIterator.next();
assertEquals("chainChannel", event.getProperty(MessageHistory.NAME_PROPERTY));
Expand All @@ -106,7 +109,8 @@ public void handleMessage(Message<?> message) {
assertEquals("chain", event.getProperty(MessageHistory.TYPE_PROPERTY));

event = historyIterator.next();
assertEquals("sampleChain$child.service-activator-within-chain", event.getProperty(MessageHistory.NAME_PROPERTY));
assertEquals("sampleChain$child.service-activator-within-chain", event
.getProperty(MessageHistory.NAME_PROPERTY));
assertEquals("service-activator", event.getProperty(MessageHistory.TYPE_PROPERTY));

event = historyIterator.next();
Expand Down Expand Up @@ -155,7 +159,8 @@ public void testMessageHistoryWithoutHistoryWriter() {
MessageHistoryIntegrationTests.class);
SampleGateway gateway = ac.getBean("sampleGateway", SampleGateway.class);
DirectChannel endOfThePipeChannel = ac.getBean("endOfThePipeChannel", DirectChannel.class);
MessageHandler handler = Mockito.spy(new MessageHandler() {
MessageHandler handler = Mockito.spy(new MessageHandler() { // Not a lambda: Mockito can't mock final classes

@Override
public void handleMessage(Message<?> message) {
assertNull(message.getHeaders().get(MessageHistory.HEADER_NAME, MessageHistory.class));
Expand All @@ -175,7 +180,8 @@ public void testMessageHistoryParser() {
"messageHistoryWithHistoryWriterNamespace.xml", MessageHistoryIntegrationTests.class);
SampleGateway gateway = ac.getBean("sampleGateway", SampleGateway.class);
DirectChannel endOfThePipeChannel = ac.getBean("endOfThePipeChannel", DirectChannel.class);
MessageHandler handler = Mockito.spy(new MessageHandler() {
MessageHandler handler = Mockito.spy(new MessageHandler() { // Not a lambda: Mockito can't mock final classes

@Override
public void handleMessage(Message<?> message) {
Iterator<Properties> historyIterator = message.getHeaders()
Expand All @@ -197,10 +203,12 @@ public void testMessageHistoryParserWithNamePatterns() {
"messageHistoryWithHistoryWriterNamespaceAndPatterns.xml", MessageHistoryIntegrationTests.class);
SampleGateway gateway = ac.getBean("sampleGateway", SampleGateway.class);
DirectChannel endOfThePipeChannel = ac.getBean("endOfThePipeChannel", DirectChannel.class);
MessageHandler handler = Mockito.spy(new MessageHandler() {
MessageHandler handler = Mockito.spy(new MessageHandler() { // Not a lambda: Mockito can't mock final classes

@Override
public void handleMessage(Message<?> message) {
Iterator<Properties> historyIterator = message.getHeaders().get(MessageHistory.HEADER_NAME, MessageHistory.class).iterator();
Iterator<Properties> historyIterator = message.getHeaders()
.get(MessageHistory.HEADER_NAME, MessageHistory.class).iterator();
assertTrue(historyIterator.hasNext());
Properties gatewayHistory = historyIterator.next();
assertEquals("sampleGateway", gatewayHistory.get("name"));
Expand All @@ -224,7 +232,8 @@ public void testMessageHistoryMoreThanOneNamespaceFail() {
MessageHistoryIntegrationTests.class).close();
}

@Test @Ignore
@Test
@Ignore
public void testMessageHistoryWithHistoryPerformance() {
ConfigurableApplicationContext acWithHistory = new ClassPathXmlApplicationContext("perfWithMessageHistory.xml",
MessageHistoryIntegrationTests.class);
Expand Down Expand Up @@ -264,7 +273,9 @@ public void testMessageHistoryWithHistoryPerformance() {
}

public interface SampleGateway {

Message<?> echo(String value);

}

}
41 changes: 19 additions & 22 deletions src/reference/asciidoc/dsl.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -511,8 +511,25 @@ public IntegrationFlow integerFlow() {
----
====

We also register a `BytesToIntegerConverter` within `ConversionService` to get rid of that additional `.transform()`.
// TODO We don't show how to register a `BytesToIntegerConverter` within `ConversionService` to get rid of that additional `.transform()`.
We also can register some `BytesToIntegerConverter` within `ConversionService` to get rid of that additional `.transform()`:

====
[source,java]
----
@Bean
@IntegrationConverter
public BytesToIntegerConverter bytesToIntegerConverter() {
return new BytesToIntegerConverter();
}

@Bean
public IntegrationFlow integerFlow() {
return IntegrationFlows.from("input")
.handle(Integer.class, (p, h) -> p * 2)
.get();
}
----
====

[[java-dsl-log]]
=== Operator log()
Expand Down Expand Up @@ -589,26 +606,6 @@ The following example does not have any channel declaration:

In the preceding example (and any time no channel has been declared), an implicit `DirectChannel` is injected in the current position of the `IntegrationFlow` and used as an output channel for the currently configured `ServiceActivatingHandler` (from the `.handle()`, <<java-dsl-handle,described earlier>>).

[IMPORTANT]
====
If `log()` or `wireTap()` are used in the end of the flow, they are considered to be one-way `MessageHandler` instances.
If you expect the integration flow to return a reply, you should add a `bridge()` should to the end, after `log()` or `wireTap()`, as the following example shows:

[source,java]
----
@Bean
public IntegrationFlow sseFlow() {
return IntegrationFlows
.from(WebFlux.inboundGateway("/sse")
.requestMapping(m ->
m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
.handle((p, h) -> Flux.just("foo", "bar", "baz"))
.log(LoggingHandler.Level.WARN)
.bridge()
.get();
}
----
====

[[java-dsl-flows]]
=== Working With Message Flows
Expand Down