From 9551b76bd9f5970b52ce3ab73388ff7efaac2d06 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Thu, 2 Aug 2018 16:36:35 -0400 Subject: [PATCH 1/4] INT-4457: Make log() in the end as reply-aware JIRA: https://jira.spring.io/browse/INT-4457 When an `IntegrationFlow` is reply-based (we expect a reply in the beginning) and `log()` (or `wireTap()`) is used in the end, we are forced to add an empty `bridge()` in the end to ensure a `replyChannel` header routing * Fix `IntegrationFlowDefinition` to add `enrichHeaders()` in the end to populate a `nullChannel` as a `replyChannel` header if that is missed in the request message headers. This way we cover both use-cases when we expect reply from the flow and when it is used as a one-way scenario * Improve a `HeaderEnricher` do not create a new `Message` if there are no new headers to add/remove * Remove a note from the `dsl.adoc` about now redundant `bridge()` after `log()` * Resolve TODO in the `.handle()` paragraph --- .../dsl/IntegrationFlowDefinition.java | 6 +- .../transformer/HeaderEnricher.java | 72 ++++--- .../dsl/flowservices/FlowServiceTests.java | 3 +- .../MessageHistoryIntegrationTests.java | 199 +++++++++--------- src/reference/asciidoc/dsl.adoc | 41 ++-- 5 files changed, 161 insertions(+), 160 deletions(-) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java index f9fb6bd2d7f..ef83920661c 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java @@ -16,6 +16,7 @@ package org.springframework.integration.dsl; +import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.Map; @@ -92,6 +93,7 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; +import org.springframework.messaging.MessageHeaders; import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; @@ -2740,7 +2742,9 @@ else if (this.currentMessageChannel != null) { .stream() .reduce((first, second) -> second); if (lastComponent.get() instanceof WireTapSpec) { - channel(IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME); + enrichHeaders( + Collections.singletonMap( + MessageHeaders.REPLY_CHANNEL, IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME)); } } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/transformer/HeaderEnricher.java b/spring-integration-core/src/main/java/org/springframework/integration/transformer/HeaderEnricher.java index 5b65bc3718c..efaaa3e24b8 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/transformer/HeaderEnricher.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/transformer/HeaderEnricher.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,11 +29,11 @@ import org.springframework.beans.factory.InitializingBean; import org.springframework.integration.context.IntegrationObjectSupport; import org.springframework.integration.handler.MessageProcessor; +import org.springframework.integration.support.AbstractIntegrationMessageBuilder; import org.springframework.integration.support.DefaultMessageBuilderFactory; import org.springframework.integration.transformer.support.HeaderValueMessageProcessor; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; -import org.springframework.messaging.MessagingException; /** * A Transformer that adds statically configured header values to a Message. @@ -64,12 +64,13 @@ public HeaderEnricher() { /** * Create a HeaderEnricher with the given map of headers. - * * @param headersToAdd The headers to add. */ public HeaderEnricher(Map> headersToAdd) { - this.headersToAdd = (headersToAdd != null) ? headersToAdd - : new HashMap>(); + this.headersToAdd = + headersToAdd != null + ? headersToAdd + : new HashMap>(); } public void setMessageProcessor(MessageProcessor messageProcessor) { @@ -86,7 +87,6 @@ public void setDefaultOverwrite(boolean defaultOverwrite) { * true. Set this to false if a * null value should trigger removal of the * corresponding header instead. - * * @param shouldSkipNulls true when null values should be skipped. */ public void setShouldSkipNulls(boolean shouldSkipNulls) { @@ -96,52 +96,56 @@ public void setShouldSkipNulls(boolean shouldSkipNulls) { @Override public String getComponentType() { - return "transformer"; // backwards compatibility + return "header-enricher"; } @Override public Message transform(Message message) { - try { - Map headerMap = new HashMap(message.getHeaders()); - this.addHeadersFromMessageProcessor(message, headerMap); - for (Map.Entry> entry : this.headersToAdd.entrySet()) { - String key = entry.getKey(); - HeaderValueMessageProcessor valueProcessor = entry.getValue(); - - Boolean shouldOverwrite = valueProcessor.isOverwrite(); - if (shouldOverwrite == null) { - shouldOverwrite = this.defaultOverwrite; - } + MessageHeaders messageHeaders = message.getHeaders(); - boolean headerDoesNotExist = headerMap.get(key) == null; + AbstractIntegrationMessageBuilder messageBuilder = + getMessageBuilderFactory() + .fromMessage(message); - /* - * Only evaluate value expression if necessary - */ - if (headerDoesNotExist || shouldOverwrite) { - Object value = valueProcessor.processMessage(message); - if (value != null || !this.shouldSkipNulls) { - headerMap.put(key, value); - } + addHeadersFromMessageProcessor(message, messageBuilder); + for (Map.Entry> entry : this.headersToAdd.entrySet()) { + String key = entry.getKey(); + HeaderValueMessageProcessor valueProcessor = entry.getValue(); + + Boolean shouldOverwrite = valueProcessor.isOverwrite(); + if (shouldOverwrite == null) { + shouldOverwrite = this.defaultOverwrite; + } + + boolean headerDoesNotExist = messageHeaders.get(key) == null; + + /* + * Only evaluate value expression if necessary + */ + if (headerDoesNotExist || shouldOverwrite) { + Object value = valueProcessor.processMessage(message); + if (value != null || !this.shouldSkipNulls) { + messageBuilder.setHeader(key, value); } } - return this.getMessageBuilderFactory().withPayload(message.getPayload()).copyHeaders(headerMap).build(); - } - catch (Exception e) { - throw new MessagingException(message, "failed to transform message headers", e); } + + return messageBuilder.build(); } - private void addHeadersFromMessageProcessor(Message message, Map headerMap) { + private void addHeadersFromMessageProcessor(Message message, + AbstractIntegrationMessageBuilder messageBuilder) { + if (this.messageProcessor != null) { Object result = this.messageProcessor.processMessage(message); if (result instanceof Map) { + MessageHeaders messageHeaders = message.getHeaders(); Map resultMap = (Map) result; for (Entry entry : resultMap.entrySet()) { Object key = entry.getKey(); if (key instanceof String) { - if (this.defaultOverwrite || headerMap.get(key) == null) { - headerMap.put((String) key, entry.getValue()); + if (this.defaultOverwrite || messageHeaders.get(key) == null) { + messageBuilder.setHeader((String) key, entry.getValue()); } } else if (logger.isDebugEnabled()) { diff --git a/spring-integration-core/src/test/java/org/springframework/integration/dsl/flowservices/FlowServiceTests.java b/spring-integration-core/src/test/java/org/springframework/integration/dsl/flowservices/FlowServiceTests.java index 9868d2ffd4c..cc66b118e6e 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/dsl/flowservices/FlowServiceTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/dsl/flowservices/FlowServiceTests.java @@ -133,8 +133,7 @@ public static class ContextConfiguration { @Bean public IntegrationFlow testGateway() { return f -> f.gateway("processChannel", g -> g.replyChannel("replyChannel")) - .log() - .bridge(); + .log(); } @Bean diff --git a/spring-integration-core/src/test/java/org/springframework/integration/history/MessageHistoryIntegrationTests.java b/spring-integration-core/src/test/java/org/springframework/integration/history/MessageHistoryIntegrationTests.java index b2e3cf3d5f4..6de87e248af 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/history/MessageHistoryIntegrationTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/history/MessageHistoryIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2016 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -61,7 +61,8 @@ public void testNoHistoryAwareMessageHandler() { for (ConsumerEndpointFactoryBean cefBean : cefBeans.values()) { DirectFieldAccessor bridgeAccessor = new DirectFieldAccessor(cefBean); String handlerClassName = bridgeAccessor.getPropertyValue("handler").getClass().getName(); - assertFalse("org.springframework.integration.config.MessageHistoryWritingMessageHandler".equals(handlerClassName)); + assertFalse("org.springframework.integration.config.MessageHistoryWritingMessageHandler" + .equals(handlerClassName)); } ac.close(); } @@ -72,74 +73,75 @@ public void testMessageHistoryWithHistoryWriter() { MessageHistoryIntegrationTests.class); SampleGateway gateway = ac.getBean("sampleGateway", SampleGateway.class); DirectChannel endOfThePipeChannel = ac.getBean("endOfThePipeChannel", DirectChannel.class); - MessageHandler handler = Mockito.spy(new MessageHandler() { - @Override - public void handleMessage(Message message) { - Iterator historyIterator = message.getHeaders().get(MessageHistory.HEADER_NAME, MessageHistory.class).iterator(); - - Properties event = historyIterator.next(); - assertEquals("sampleGateway", event.getProperty(MessageHistory.NAME_PROPERTY)); - assertEquals("gateway", event.getProperty(MessageHistory.TYPE_PROPERTY)); - - event = historyIterator.next(); - assertEquals("bridgeInChannel", event.getProperty(MessageHistory.NAME_PROPERTY)); - assertEquals("channel", event.getProperty(MessageHistory.TYPE_PROPERTY)); - - event = historyIterator.next(); - assertEquals("testBridge", event.getProperty(MessageHistory.NAME_PROPERTY)); - assertEquals("bridge", event.getProperty(MessageHistory.TYPE_PROPERTY)); - - event = historyIterator.next(); - assertEquals("headerEnricherChannel", event.getProperty(MessageHistory.NAME_PROPERTY)); - assertEquals("channel", event.getProperty(MessageHistory.TYPE_PROPERTY)); - - event = historyIterator.next(); - assertEquals("testHeaderEnricher", event.getProperty(MessageHistory.NAME_PROPERTY)); - assertEquals("transformer", event.getProperty(MessageHistory.TYPE_PROPERTY)); - - event = historyIterator.next(); - assertEquals("chainChannel", event.getProperty(MessageHistory.NAME_PROPERTY)); - assertEquals("channel", event.getProperty(MessageHistory.TYPE_PROPERTY)); - - event = historyIterator.next(); - assertEquals("sampleChain", event.getProperty(MessageHistory.NAME_PROPERTY)); - assertEquals("chain", event.getProperty(MessageHistory.TYPE_PROPERTY)); - - event = historyIterator.next(); - assertEquals("sampleChain$child.service-activator-within-chain", event.getProperty(MessageHistory.NAME_PROPERTY)); - assertEquals("service-activator", event.getProperty(MessageHistory.TYPE_PROPERTY)); - - event = historyIterator.next(); - assertEquals("filterChannel", event.getProperty(MessageHistory.NAME_PROPERTY)); - assertEquals("channel", event.getProperty(MessageHistory.TYPE_PROPERTY)); - - event = historyIterator.next(); - assertEquals("testFilter", event.getProperty(MessageHistory.NAME_PROPERTY)); - assertEquals("filter", event.getProperty(MessageHistory.TYPE_PROPERTY)); - - event = historyIterator.next(); - assertEquals("splitterChannel", event.getProperty(MessageHistory.NAME_PROPERTY)); - assertEquals("channel", event.getProperty(MessageHistory.TYPE_PROPERTY)); - - event = historyIterator.next(); - assertEquals("testSplitter", event.getProperty(MessageHistory.NAME_PROPERTY)); - assertEquals("splitter", event.getProperty(MessageHistory.TYPE_PROPERTY)); - - event = historyIterator.next(); - assertEquals("aggregatorChannel", event.getProperty(MessageHistory.NAME_PROPERTY)); - assertEquals("channel", event.getProperty(MessageHistory.TYPE_PROPERTY)); - - event = historyIterator.next(); - assertEquals("testAggregator", event.getProperty(MessageHistory.NAME_PROPERTY)); - assertEquals("aggregator", event.getProperty(MessageHistory.TYPE_PROPERTY)); - - event = historyIterator.next(); - assertEquals("endOfThePipeChannel", event.getProperty(MessageHistory.NAME_PROPERTY)); - assertEquals("channel", event.getProperty(MessageHistory.TYPE_PROPERTY)); - - MessageChannel replyChannel = (MessageChannel) message.getHeaders().getReplyChannel(); - replyChannel.send(message); - } + MessageHandler handler = Mockito.spy(message -> { + Iterator historyIterator = + message.getHeaders() + .get(MessageHistory.HEADER_NAME, MessageHistory.class) + .iterator(); + + Properties event = historyIterator.next(); + assertEquals("sampleGateway", event.getProperty(MessageHistory.NAME_PROPERTY)); + assertEquals("gateway", event.getProperty(MessageHistory.TYPE_PROPERTY)); + + event = historyIterator.next(); + assertEquals("bridgeInChannel", event.getProperty(MessageHistory.NAME_PROPERTY)); + assertEquals("channel", event.getProperty(MessageHistory.TYPE_PROPERTY)); + + event = historyIterator.next(); + assertEquals("testBridge", event.getProperty(MessageHistory.NAME_PROPERTY)); + assertEquals("bridge", event.getProperty(MessageHistory.TYPE_PROPERTY)); + + event = historyIterator.next(); + assertEquals("headerEnricherChannel", event.getProperty(MessageHistory.NAME_PROPERTY)); + assertEquals("channel", event.getProperty(MessageHistory.TYPE_PROPERTY)); + + event = historyIterator.next(); + assertEquals("testHeaderEnricher", event.getProperty(MessageHistory.NAME_PROPERTY)); + assertEquals("header-enricher", event.getProperty(MessageHistory.TYPE_PROPERTY)); + + event = historyIterator.next(); + assertEquals("chainChannel", event.getProperty(MessageHistory.NAME_PROPERTY)); + assertEquals("channel", event.getProperty(MessageHistory.TYPE_PROPERTY)); + + event = historyIterator.next(); + assertEquals("sampleChain", event.getProperty(MessageHistory.NAME_PROPERTY)); + assertEquals("chain", event.getProperty(MessageHistory.TYPE_PROPERTY)); + + event = historyIterator.next(); + assertEquals("sampleChain$child.service-activator-within-chain", event + .getProperty(MessageHistory.NAME_PROPERTY)); + assertEquals("service-activator", event.getProperty(MessageHistory.TYPE_PROPERTY)); + + event = historyIterator.next(); + assertEquals("filterChannel", event.getProperty(MessageHistory.NAME_PROPERTY)); + assertEquals("channel", event.getProperty(MessageHistory.TYPE_PROPERTY)); + + event = historyIterator.next(); + assertEquals("testFilter", event.getProperty(MessageHistory.NAME_PROPERTY)); + assertEquals("filter", event.getProperty(MessageHistory.TYPE_PROPERTY)); + + event = historyIterator.next(); + assertEquals("splitterChannel", event.getProperty(MessageHistory.NAME_PROPERTY)); + assertEquals("channel", event.getProperty(MessageHistory.TYPE_PROPERTY)); + + event = historyIterator.next(); + assertEquals("testSplitter", event.getProperty(MessageHistory.NAME_PROPERTY)); + assertEquals("splitter", event.getProperty(MessageHistory.TYPE_PROPERTY)); + + event = historyIterator.next(); + assertEquals("aggregatorChannel", event.getProperty(MessageHistory.NAME_PROPERTY)); + assertEquals("channel", event.getProperty(MessageHistory.TYPE_PROPERTY)); + + event = historyIterator.next(); + assertEquals("testAggregator", event.getProperty(MessageHistory.NAME_PROPERTY)); + assertEquals("aggregator", event.getProperty(MessageHistory.TYPE_PROPERTY)); + + event = historyIterator.next(); + assertEquals("endOfThePipeChannel", event.getProperty(MessageHistory.NAME_PROPERTY)); + assertEquals("channel", event.getProperty(MessageHistory.TYPE_PROPERTY)); + + MessageChannel replyChannel = (MessageChannel) message.getHeaders().getReplyChannel(); + replyChannel.send(message); }); endOfThePipeChannel.subscribe(handler); Message result = gateway.echo("hello"); @@ -155,13 +157,10 @@ public void testMessageHistoryWithoutHistoryWriter() { MessageHistoryIntegrationTests.class); SampleGateway gateway = ac.getBean("sampleGateway", SampleGateway.class); DirectChannel endOfThePipeChannel = ac.getBean("endOfThePipeChannel", DirectChannel.class); - MessageHandler handler = Mockito.spy(new MessageHandler() { - @Override - public void handleMessage(Message message) { - assertNull(message.getHeaders().get(MessageHistory.HEADER_NAME, MessageHistory.class)); - MessageChannel replyChannel = (MessageChannel) message.getHeaders().getReplyChannel(); - replyChannel.send(message); - } + MessageHandler handler = Mockito.spy(message -> { + assertNull(message.getHeaders().get(MessageHistory.HEADER_NAME, MessageHistory.class)); + MessageChannel replyChannel = (MessageChannel) message.getHeaders().getReplyChannel(); + replyChannel.send(message); }); endOfThePipeChannel.subscribe(handler); gateway.echo("hello"); @@ -175,15 +174,12 @@ public void testMessageHistoryParser() { "messageHistoryWithHistoryWriterNamespace.xml", MessageHistoryIntegrationTests.class); SampleGateway gateway = ac.getBean("sampleGateway", SampleGateway.class); DirectChannel endOfThePipeChannel = ac.getBean("endOfThePipeChannel", DirectChannel.class); - MessageHandler handler = Mockito.spy(new MessageHandler() { - @Override - public void handleMessage(Message message) { - Iterator historyIterator = message.getHeaders() - .get(MessageHistory.HEADER_NAME, MessageHistory.class).iterator(); - assertTrue(historyIterator.hasNext()); - MessageChannel replyChannel = (MessageChannel) message.getHeaders().getReplyChannel(); - replyChannel.send(message); - } + MessageHandler handler = Mockito.spy(message -> { + Iterator historyIterator = message.getHeaders() + .get(MessageHistory.HEADER_NAME, MessageHistory.class).iterator(); + assertTrue(historyIterator.hasNext()); + MessageChannel replyChannel = (MessageChannel) message.getHeaders().getReplyChannel(); + replyChannel.send(message); }); endOfThePipeChannel.subscribe(handler); gateway.echo("hello"); @@ -197,20 +193,18 @@ public void testMessageHistoryParserWithNamePatterns() { "messageHistoryWithHistoryWriterNamespaceAndPatterns.xml", MessageHistoryIntegrationTests.class); SampleGateway gateway = ac.getBean("sampleGateway", SampleGateway.class); DirectChannel endOfThePipeChannel = ac.getBean("endOfThePipeChannel", DirectChannel.class); - MessageHandler handler = Mockito.spy(new MessageHandler() { - @Override - public void handleMessage(Message message) { - Iterator historyIterator = message.getHeaders().get(MessageHistory.HEADER_NAME, MessageHistory.class).iterator(); - assertTrue(historyIterator.hasNext()); - Properties gatewayHistory = historyIterator.next(); - assertEquals("sampleGateway", gatewayHistory.get("name")); - assertTrue(historyIterator.hasNext()); - Properties chainHistory = historyIterator.next(); - assertEquals("sampleChain", chainHistory.get("name")); - assertFalse(historyIterator.hasNext()); - MessageChannel replyChannel = (MessageChannel) message.getHeaders().getReplyChannel(); - replyChannel.send(message); - } + MessageHandler handler = Mockito.spy(message -> { + Iterator historyIterator = message.getHeaders() + .get(MessageHistory.HEADER_NAME, MessageHistory.class).iterator(); + assertTrue(historyIterator.hasNext()); + Properties gatewayHistory = historyIterator.next(); + assertEquals("sampleGateway", gatewayHistory.get("name")); + assertTrue(historyIterator.hasNext()); + Properties chainHistory = historyIterator.next(); + assertEquals("sampleChain", chainHistory.get("name")); + assertFalse(historyIterator.hasNext()); + MessageChannel replyChannel = (MessageChannel) message.getHeaders().getReplyChannel(); + replyChannel.send(message); }); endOfThePipeChannel.subscribe(handler); gateway.echo("hello"); @@ -224,7 +218,8 @@ public void testMessageHistoryMoreThanOneNamespaceFail() { MessageHistoryIntegrationTests.class).close(); } - @Test @Ignore + @Test + @Ignore public void testMessageHistoryWithHistoryPerformance() { ConfigurableApplicationContext acWithHistory = new ClassPathXmlApplicationContext("perfWithMessageHistory.xml", MessageHistoryIntegrationTests.class); @@ -264,7 +259,9 @@ public void testMessageHistoryWithHistoryPerformance() { } public interface SampleGateway { + Message echo(String value); + } } diff --git a/src/reference/asciidoc/dsl.adoc b/src/reference/asciidoc/dsl.adoc index f1402063667..3880b942ce3 100644 --- a/src/reference/asciidoc/dsl.adoc +++ b/src/reference/asciidoc/dsl.adoc @@ -511,8 +511,25 @@ public IntegrationFlow integerFlow() { ---- ==== -We also register a `BytesToIntegerConverter` within `ConversionService` to get rid of that additional `.transform()`. -// TODO We don't show how to register a `BytesToIntegerConverter` within `ConversionService` to get rid of that additional `.transform()`. +We also can register some `BytesToIntegerConverter` within `ConversionService` to get rid of that additional `.transform()`: + +==== +[source,java] +---- +@Bean +@IntegrationConverter +public BytesToIntegerConverter bytesToIntegerConverter() { + return new BytesToIntegerConverter(); +} + +@Bean +public IntegrationFlow integerFlow() { + return IntegrationFlows.from("input") + .handle(Integer.class, (p, h) -> p * 2) + .get(); +} +---- +==== [[java-dsl-log]] === Operator log() @@ -589,26 +606,6 @@ The following example does not have any channel declaration: In the preceding example (and any time no channel has been declared), an implicit `DirectChannel` is injected in the current position of the `IntegrationFlow` and used as an output channel for the currently configured `ServiceActivatingHandler` (from the `.handle()`, <>). -[IMPORTANT] -==== -If `log()` or `wireTap()` are used in the end of the flow, they are considered to be one-way `MessageHandler` instances. -If you expect the integration flow to return a reply, you should add a `bridge()` should to the end, after `log()` or `wireTap()`, as the following example shows: - -[source,java] ----- -@Bean -public IntegrationFlow sseFlow() { - return IntegrationFlows - .from(WebFlux.inboundGateway("/sse") - .requestMapping(m -> - m.produces(MediaType.TEXT_EVENT_STREAM_VALUE))) - .handle((p, h) -> Flux.just("foo", "bar", "baz")) - .log(LoggingHandler.Level.WARN) - .bridge() - .get(); -} ----- -==== [[java-dsl-flows]] === Working With Message Flows From 6d2856249a9259a9dca5b55d877ed0e5ba26978c Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Thu, 2 Aug 2018 17:48:21 -0400 Subject: [PATCH 2/4] * Restore anonymous class in the MessageHistoryIntegrationTests: Mockito can't mock/spy lambdas because they are `final` classes --- .../MessageHistoryIntegrationTests.java | 164 ++++++++++-------- 1 file changed, 89 insertions(+), 75 deletions(-) diff --git a/spring-integration-core/src/test/java/org/springframework/integration/history/MessageHistoryIntegrationTests.java b/spring-integration-core/src/test/java/org/springframework/integration/history/MessageHistoryIntegrationTests.java index 6de87e248af..f1b5cf1df24 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/history/MessageHistoryIntegrationTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/history/MessageHistoryIntegrationTests.java @@ -73,75 +73,77 @@ public void testMessageHistoryWithHistoryWriter() { MessageHistoryIntegrationTests.class); SampleGateway gateway = ac.getBean("sampleGateway", SampleGateway.class); DirectChannel endOfThePipeChannel = ac.getBean("endOfThePipeChannel", DirectChannel.class); - MessageHandler handler = Mockito.spy(message -> { - Iterator historyIterator = - message.getHeaders() - .get(MessageHistory.HEADER_NAME, MessageHistory.class) - .iterator(); + MessageHandler handler = Mockito.spy(new MessageHandler() { // Not a lambda: Mockito can't mock final classes - Properties event = historyIterator.next(); - assertEquals("sampleGateway", event.getProperty(MessageHistory.NAME_PROPERTY)); - assertEquals("gateway", event.getProperty(MessageHistory.TYPE_PROPERTY)); + @Override + public void handleMessage(Message message) { + Iterator historyIterator = message.getHeaders() + .get(MessageHistory.HEADER_NAME, MessageHistory.class).iterator(); - event = historyIterator.next(); - assertEquals("bridgeInChannel", event.getProperty(MessageHistory.NAME_PROPERTY)); - assertEquals("channel", event.getProperty(MessageHistory.TYPE_PROPERTY)); + Properties event = historyIterator.next(); + assertEquals("sampleGateway", event.getProperty(MessageHistory.NAME_PROPERTY)); + assertEquals("gateway", event.getProperty(MessageHistory.TYPE_PROPERTY)); - event = historyIterator.next(); - assertEquals("testBridge", event.getProperty(MessageHistory.NAME_PROPERTY)); - assertEquals("bridge", event.getProperty(MessageHistory.TYPE_PROPERTY)); + event = historyIterator.next(); + assertEquals("bridgeInChannel", event.getProperty(MessageHistory.NAME_PROPERTY)); + assertEquals("channel", event.getProperty(MessageHistory.TYPE_PROPERTY)); - event = historyIterator.next(); - assertEquals("headerEnricherChannel", event.getProperty(MessageHistory.NAME_PROPERTY)); - assertEquals("channel", event.getProperty(MessageHistory.TYPE_PROPERTY)); + event = historyIterator.next(); + assertEquals("testBridge", event.getProperty(MessageHistory.NAME_PROPERTY)); + assertEquals("bridge", event.getProperty(MessageHistory.TYPE_PROPERTY)); - event = historyIterator.next(); - assertEquals("testHeaderEnricher", event.getProperty(MessageHistory.NAME_PROPERTY)); - assertEquals("header-enricher", event.getProperty(MessageHistory.TYPE_PROPERTY)); + event = historyIterator.next(); + assertEquals("headerEnricherChannel", event.getProperty(MessageHistory.NAME_PROPERTY)); + assertEquals("channel", event.getProperty(MessageHistory.TYPE_PROPERTY)); - event = historyIterator.next(); - assertEquals("chainChannel", event.getProperty(MessageHistory.NAME_PROPERTY)); - assertEquals("channel", event.getProperty(MessageHistory.TYPE_PROPERTY)); + event = historyIterator.next(); + assertEquals("testHeaderEnricher", event.getProperty(MessageHistory.NAME_PROPERTY)); + assertEquals("header-enricher", event.getProperty(MessageHistory.TYPE_PROPERTY)); - event = historyIterator.next(); - assertEquals("sampleChain", event.getProperty(MessageHistory.NAME_PROPERTY)); - assertEquals("chain", event.getProperty(MessageHistory.TYPE_PROPERTY)); + event = historyIterator.next(); + assertEquals("chainChannel", event.getProperty(MessageHistory.NAME_PROPERTY)); + assertEquals("channel", event.getProperty(MessageHistory.TYPE_PROPERTY)); - event = historyIterator.next(); - assertEquals("sampleChain$child.service-activator-within-chain", event - .getProperty(MessageHistory.NAME_PROPERTY)); - assertEquals("service-activator", event.getProperty(MessageHistory.TYPE_PROPERTY)); + event = historyIterator.next(); + assertEquals("sampleChain", event.getProperty(MessageHistory.NAME_PROPERTY)); + assertEquals("chain", event.getProperty(MessageHistory.TYPE_PROPERTY)); - event = historyIterator.next(); - assertEquals("filterChannel", event.getProperty(MessageHistory.NAME_PROPERTY)); - assertEquals("channel", event.getProperty(MessageHistory.TYPE_PROPERTY)); + event = historyIterator.next(); + assertEquals("sampleChain$child.service-activator-within-chain", event + .getProperty(MessageHistory.NAME_PROPERTY)); + assertEquals("service-activator", event.getProperty(MessageHistory.TYPE_PROPERTY)); - event = historyIterator.next(); - assertEquals("testFilter", event.getProperty(MessageHistory.NAME_PROPERTY)); - assertEquals("filter", event.getProperty(MessageHistory.TYPE_PROPERTY)); + event = historyIterator.next(); + assertEquals("filterChannel", event.getProperty(MessageHistory.NAME_PROPERTY)); + assertEquals("channel", event.getProperty(MessageHistory.TYPE_PROPERTY)); - event = historyIterator.next(); - assertEquals("splitterChannel", event.getProperty(MessageHistory.NAME_PROPERTY)); - assertEquals("channel", event.getProperty(MessageHistory.TYPE_PROPERTY)); + event = historyIterator.next(); + assertEquals("testFilter", event.getProperty(MessageHistory.NAME_PROPERTY)); + assertEquals("filter", event.getProperty(MessageHistory.TYPE_PROPERTY)); - event = historyIterator.next(); - assertEquals("testSplitter", event.getProperty(MessageHistory.NAME_PROPERTY)); - assertEquals("splitter", event.getProperty(MessageHistory.TYPE_PROPERTY)); + event = historyIterator.next(); + assertEquals("splitterChannel", event.getProperty(MessageHistory.NAME_PROPERTY)); + assertEquals("channel", event.getProperty(MessageHistory.TYPE_PROPERTY)); - event = historyIterator.next(); - assertEquals("aggregatorChannel", event.getProperty(MessageHistory.NAME_PROPERTY)); - assertEquals("channel", event.getProperty(MessageHistory.TYPE_PROPERTY)); + event = historyIterator.next(); + assertEquals("testSplitter", event.getProperty(MessageHistory.NAME_PROPERTY)); + assertEquals("splitter", event.getProperty(MessageHistory.TYPE_PROPERTY)); - event = historyIterator.next(); - assertEquals("testAggregator", event.getProperty(MessageHistory.NAME_PROPERTY)); - assertEquals("aggregator", event.getProperty(MessageHistory.TYPE_PROPERTY)); + event = historyIterator.next(); + assertEquals("aggregatorChannel", event.getProperty(MessageHistory.NAME_PROPERTY)); + assertEquals("channel", event.getProperty(MessageHistory.TYPE_PROPERTY)); - event = historyIterator.next(); - assertEquals("endOfThePipeChannel", event.getProperty(MessageHistory.NAME_PROPERTY)); - assertEquals("channel", event.getProperty(MessageHistory.TYPE_PROPERTY)); + event = historyIterator.next(); + assertEquals("testAggregator", event.getProperty(MessageHistory.NAME_PROPERTY)); + assertEquals("aggregator", event.getProperty(MessageHistory.TYPE_PROPERTY)); - MessageChannel replyChannel = (MessageChannel) message.getHeaders().getReplyChannel(); - replyChannel.send(message); + event = historyIterator.next(); + assertEquals("endOfThePipeChannel", event.getProperty(MessageHistory.NAME_PROPERTY)); + assertEquals("channel", event.getProperty(MessageHistory.TYPE_PROPERTY)); + + MessageChannel replyChannel = (MessageChannel) message.getHeaders().getReplyChannel(); + replyChannel.send(message); + } }); endOfThePipeChannel.subscribe(handler); Message result = gateway.echo("hello"); @@ -157,10 +159,14 @@ public void testMessageHistoryWithoutHistoryWriter() { MessageHistoryIntegrationTests.class); SampleGateway gateway = ac.getBean("sampleGateway", SampleGateway.class); DirectChannel endOfThePipeChannel = ac.getBean("endOfThePipeChannel", DirectChannel.class); - MessageHandler handler = Mockito.spy(message -> { - assertNull(message.getHeaders().get(MessageHistory.HEADER_NAME, MessageHistory.class)); - MessageChannel replyChannel = (MessageChannel) message.getHeaders().getReplyChannel(); - replyChannel.send(message); + MessageHandler handler = Mockito.spy(new MessageHandler() { // Not a lambda: Mockito can't mock final classes + + @Override + public void handleMessage(Message message) { + assertNull(message.getHeaders().get(MessageHistory.HEADER_NAME, MessageHistory.class)); + MessageChannel replyChannel = (MessageChannel) message.getHeaders().getReplyChannel(); + replyChannel.send(message); + } }); endOfThePipeChannel.subscribe(handler); gateway.echo("hello"); @@ -174,12 +180,16 @@ public void testMessageHistoryParser() { "messageHistoryWithHistoryWriterNamespace.xml", MessageHistoryIntegrationTests.class); SampleGateway gateway = ac.getBean("sampleGateway", SampleGateway.class); DirectChannel endOfThePipeChannel = ac.getBean("endOfThePipeChannel", DirectChannel.class); - MessageHandler handler = Mockito.spy(message -> { - Iterator historyIterator = message.getHeaders() - .get(MessageHistory.HEADER_NAME, MessageHistory.class).iterator(); - assertTrue(historyIterator.hasNext()); - MessageChannel replyChannel = (MessageChannel) message.getHeaders().getReplyChannel(); - replyChannel.send(message); + MessageHandler handler = Mockito.spy(new MessageHandler() { // Not a lambda: Mockito can't mock final classes + + @Override + public void handleMessage(Message message) { + Iterator historyIterator = message.getHeaders() + .get(MessageHistory.HEADER_NAME, MessageHistory.class).iterator(); + assertTrue(historyIterator.hasNext()); + MessageChannel replyChannel = (MessageChannel) message.getHeaders().getReplyChannel(); + replyChannel.send(message); + } }); endOfThePipeChannel.subscribe(handler); gateway.echo("hello"); @@ -193,18 +203,22 @@ public void testMessageHistoryParserWithNamePatterns() { "messageHistoryWithHistoryWriterNamespaceAndPatterns.xml", MessageHistoryIntegrationTests.class); SampleGateway gateway = ac.getBean("sampleGateway", SampleGateway.class); DirectChannel endOfThePipeChannel = ac.getBean("endOfThePipeChannel", DirectChannel.class); - MessageHandler handler = Mockito.spy(message -> { - Iterator historyIterator = message.getHeaders() - .get(MessageHistory.HEADER_NAME, MessageHistory.class).iterator(); - assertTrue(historyIterator.hasNext()); - Properties gatewayHistory = historyIterator.next(); - assertEquals("sampleGateway", gatewayHistory.get("name")); - assertTrue(historyIterator.hasNext()); - Properties chainHistory = historyIterator.next(); - assertEquals("sampleChain", chainHistory.get("name")); - assertFalse(historyIterator.hasNext()); - MessageChannel replyChannel = (MessageChannel) message.getHeaders().getReplyChannel(); - replyChannel.send(message); + MessageHandler handler = Mockito.spy(new MessageHandler() { // Not a lambda: Mockito can't mock final classes + + @Override + public void handleMessage(Message message) { + Iterator historyIterator = message.getHeaders() + .get(MessageHistory.HEADER_NAME, MessageHistory.class).iterator(); + assertTrue(historyIterator.hasNext()); + Properties gatewayHistory = historyIterator.next(); + assertEquals("sampleGateway", gatewayHistory.get("name")); + assertTrue(historyIterator.hasNext()); + Properties chainHistory = historyIterator.next(); + assertEquals("sampleChain", chainHistory.get("name")); + assertFalse(historyIterator.hasNext()); + MessageChannel replyChannel = (MessageChannel) message.getHeaders().getReplyChannel(); + replyChannel.send(message); + } }); endOfThePipeChannel.subscribe(handler); gateway.echo("hello"); From 7667228a88c88ae4996cc8e573fad38893d88ef4 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Mon, 6 Aug 2018 15:14:59 -0400 Subject: [PATCH 3/4] * Introduce `IntegrationFlowDefinition.logAndReply()` operator --- .../dsl/IntegrationFlowDefinition.java | 308 +++++++++++++++++- .../dsl/flowservices/FlowServiceTests.java | 2 +- .../dsl/transformers/TransformerTests.java | 2 +- src/reference/asciidoc/dsl.adoc | 4 + src/reference/asciidoc/whats-new.adoc | 2 + 5 files changed, 308 insertions(+), 10 deletions(-) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java index ef83920661c..957ab4725ab 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java @@ -16,7 +16,6 @@ package org.springframework.integration.dsl; -import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.Map; @@ -93,7 +92,6 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; -import org.springframework.messaging.MessageHeaders; import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; @@ -2196,6 +2194,9 @@ public B gateway(IntegrationFlow flow, Consumer endpointCon * logging level and {@code org.springframework.integration.handler.LoggingHandler} * as a default logging category. *

The full request {@link Message} will be logged. + *

When this operator is used in the end of flow, it is treated + * as one-way handler without any replies to continue. + * The {@link #logAndReply()} should be used for request-reply configuration. * @return the current {@link IntegrationFlowDefinition}. * @see #wireTap(WireTapSpec) */ @@ -2209,6 +2210,9 @@ public B log() { * logging level and {@code org.springframework.integration.handler.LoggingHandler} * as a default logging category. *

The full request {@link Message} will be logged. + *

When this operator is used in the end of flow, it is treated + * as one-way handler without any replies to continue. + * The {@link #logAndReply()} should be used for request-reply configuration. * @param level the {@link LoggingHandler.Level}. * @return the current {@link IntegrationFlowDefinition}. * @see #wireTap(WireTapSpec) @@ -2222,6 +2226,9 @@ public B log(LoggingHandler.Level level) { * with the {@link LoggingHandler} subscriber for the provided logging category * and {@code INFO} logging level. *

The full request {@link Message} will be logged. + *

When this operator is used in the end of flow, it is treated + * as one-way handler without any replies to continue. + * The {@link #logAndReply()} should be used for request-reply configuration. * @param category the logging category to use. * @return the current {@link IntegrationFlowDefinition}. * @see #wireTap(WireTapSpec) @@ -2235,6 +2242,9 @@ public B log(String category) { * with the {@link LoggingHandler} subscriber for the provided * {@link LoggingHandler.Level} logging level and logging category. *

The full request {@link Message} will be logged. + *

When this operator is used in the end of flow, it is treated + * as one-way handler without any replies to continue. + * The {@link #logAndReply()} should be used for request-reply configuration. * @param level the {@link LoggingHandler.Level}. * @param category the logging category to use. * @return the current {@link IntegrationFlowDefinition}. @@ -2249,6 +2259,9 @@ public B log(LoggingHandler.Level level, String category) { * with the {@link LoggingHandler} subscriber for the provided * {@link LoggingHandler.Level} logging level, logging category * and SpEL expression for the log message. + *

When this operator is used in the end of flow, it is treated + * as one-way handler without any replies to continue. + * The {@link #logAndReply()} should be used for request-reply configuration. * @param level the {@link LoggingHandler.Level}. * @param category the logging category. * @param logExpression the SpEL expression to evaluate logger message at runtime @@ -2266,6 +2279,9 @@ public B log(LoggingHandler.Level level, String category, String logExpression) * with the {@link LoggingHandler} subscriber for the {@code INFO} logging level, * the {@code org.springframework.integration.handler.LoggingHandler} * as a default logging category and {@link Function} for the log message. + *

When this operator is used in the end of flow, it is treated + * as one-way handler without any replies to continue. + * The {@link #logAndReply()} should be used for request-reply configuration. * @param function the function to evaluate logger message at runtime * @param

the expected payload type. * against the request {@link Message}. @@ -2283,6 +2299,9 @@ public

B log(Function, Object> function) { * the {@code org.springframework.integration.handler.LoggingHandler} * as a default logging category and SpEL expression to evaluate * logger message at runtime against the request {@link Message}. + *

When this operator is used in the end of flow, it is treated + * as one-way handler without any replies to continue. + * The {@link #logAndReply()} should be used for request-reply configuration. * @param logExpression the {@link Expression} to evaluate logger message at runtime * against the request {@link Message}. * @return the current {@link IntegrationFlowDefinition}. @@ -2299,6 +2318,9 @@ public B log(Expression logExpression) { * the {@code org.springframework.integration.handler.LoggingHandler} * as a default logging category and SpEL expression to evaluate * logger message at runtime against the request {@link Message}. + *

When this operator is used in the end of flow, it is treated + * as one-way handler without any replies to continue. + * The {@link #logAndReply()} should be used for request-reply configuration. * @param level the {@link LoggingHandler.Level}. * @param logExpression the {@link Expression} to evaluate logger message at runtime * against the request {@link Message}. @@ -2309,13 +2331,15 @@ public B log(LoggingHandler.Level level, Expression logExpression) { return log(level, null, logExpression); } - /** * Populate a {@link WireTap} for the {@link #currentMessageChannel} * with the {@link LoggingHandler} subscriber for the {@code INFO} * {@link LoggingHandler.Level} logging level, * the provided logging category and SpEL expression to evaluate * logger message at runtime against the request {@link Message}. + *

When this operator is used in the end of flow, it is treated + * as one-way handler without any replies to continue. + * The {@link #logAndReply()} should be used for request-reply configuration. * @param category the logging category. * @param logExpression the {@link Expression} to evaluate logger message at runtime * against the request {@link Message}. @@ -2332,6 +2356,9 @@ public B log(String category, Expression logExpression) { * {@link LoggingHandler.Level} logging level, * the {@code org.springframework.integration.handler.LoggingHandler} * as a default logging category and {@link Function} for the log message. + *

When this operator is used in the end of flow, it is treated + * as one-way handler without any replies to continue. + * The {@link #logAndReply()} should be used for request-reply configuration. * @param level the {@link LoggingHandler.Level}. * @param function the function to evaluate logger message at runtime * @param

the expected payload type. @@ -2348,6 +2375,9 @@ public

B log(LoggingHandler.Level level, Function, Object> functi * with the {@link LoggingHandler} subscriber for the provided * {@link LoggingHandler.Level} logging level, * the provided logging category and {@link Function} for the log message. + *

When this operator is used in the end of flow, it is treated + * as one-way handler without any replies to continue. + * The {@link #logAndReply()} should be used for request-reply configuration. * @param category the logging category. * @param function the function to evaluate logger message at runtime * @param

the expected payload type. @@ -2364,6 +2394,9 @@ public

B log(String category, Function, Object> function) { * with the {@link LoggingHandler} subscriber for the provided * {@link LoggingHandler.Level} logging level, logging category * and {@link Function} for the log message. + *

When this operator is used in the end of flow, it is treated + * as one-way handler without any replies to continue. + * The {@link #logAndReply()} should be used for request-reply configuration. * @param level the {@link LoggingHandler.Level}. * @param category the logging category. * @param function the function to evaluate logger message at runtime @@ -2377,12 +2410,14 @@ public

B log(LoggingHandler.Level level, String category, Function(function)); } - /** * Populate a {@link WireTap} for the {@link #currentMessageChannel} * with the {@link LoggingHandler} subscriber for the provided * {@link LoggingHandler.Level} logging level, logging category * and SpEL expression for the log message. + *

When this operator is used in the end of flow, it is treated + * as one-way handler without any replies to continue. + * The {@link #logAndReply()} should be used for request-reply configuration. * @param level the {@link LoggingHandler.Level}. * @param category the logging category. * @param logExpression the {@link Expression} to evaluate logger message at runtime @@ -2408,6 +2443,264 @@ public B log(LoggingHandler.Level level, String category, Expression logExpressi return wireTap(loggerChannel); } + /** + * Populate a {@link WireTap} for the {@link #currentMessageChannel} + * with the {@link LoggingHandler} subscriber for the {@code INFO} + * logging level and {@code org.springframework.integration.handler.LoggingHandler} + * as a default logging category. + *

The full request {@link Message} will be logged. + *

A {@link #bridge()} is added after this operator to make the flow reply-producing + * if the {@code replyChannel} header is present. + *

This operator can be used only in the end of flow. + * @return an {@link IntegrationFlow} instance based on this builder. + * @see #log() + * @see #bridge() + */ + public IntegrationFlow logAndReply() { + return logAndReply(LoggingHandler.Level.INFO); + } + + /** + * Populate a {@link WireTap} for the {@link #currentMessageChannel} + * with the {@link LoggingHandler} subscriber for provided {@link LoggingHandler.Level} + * logging level and {@code org.springframework.integration.handler.LoggingHandler} + * as a default logging category. + *

The full request {@link Message} will be logged. + *

A {@link #bridge()} is added after this operator to make the flow reply-producing + * if the {@code replyChannel} header is present. + *

This operator can be used only in the end of flow. + * @param level the {@link LoggingHandler.Level}. + * @return an {@link IntegrationFlow} instance based on this builder. + * @see #log() + * @see #bridge() + */ + public IntegrationFlow logAndReply(LoggingHandler.Level level) { + return logAndReply(level, (String) null); + } + + /** + * Populate a {@link WireTap} for the {@link #currentMessageChannel} + * with the {@link LoggingHandler} subscriber for the provided logging category + * and {@code INFO} logging level. + *

The full request {@link Message} will be logged. + *

A {@link #bridge()} is added after this operator to make the flow reply-producing + * if the {@code replyChannel} header is present. + *

This operator can be used only in the end of flow. + * @param category the logging category to use. + * @return an {@link IntegrationFlow} instance based on this builder. + * @see #log() + * @see #bridge() + */ + public IntegrationFlow logAndReply(String category) { + return logAndReply(LoggingHandler.Level.INFO, category); + } + + /** + * Populate a {@link WireTap} for the {@link #currentMessageChannel} + * with the {@link LoggingHandler} subscriber for the provided + * {@link LoggingHandler.Level} logging level and logging category. + *

The full request {@link Message} will be logged. + *

A {@link #bridge()} is added after this operator to make the flow reply-producing + * if the {@code replyChannel} header is present. + *

This operator can be used only in the end of flow. + * @param level the {@link LoggingHandler.Level}. + * @param category the logging category to use. + * @return an {@link IntegrationFlow} instance based on this builder. + * @see #log() + * @see #bridge() + */ + public IntegrationFlow logAndReply(LoggingHandler.Level level, String category) { + return logAndReply(level, category, (Expression) null); + } + + /** + * Populate a {@link WireTap} for the {@link #currentMessageChannel} + * with the {@link LoggingHandler} subscriber for the provided + * {@link LoggingHandler.Level} logging level, logging category + * and SpEL expression for the log message. + *

A {@link #bridge()} is added after this operator to make the flow reply-producing + * if the {@code replyChannel} header is present. + *

This operator can be used only in the end of flow. + * @param level the {@link LoggingHandler.Level}. + * @param category the logging category. + * @param logExpression the SpEL expression to evaluate logger message at runtime + * against the request {@link Message}. + * @return an {@link IntegrationFlow} instance based on this builder. + * @see #log() + * @see #bridge() + */ + public IntegrationFlow logAndReply(LoggingHandler.Level level, String category, String logExpression) { + Assert.hasText(logExpression, "'logExpression' must not be empty"); + return logAndReply(level, category, PARSER.parseExpression(logExpression)); + } + + /** + * Populate a {@link WireTap} for the {@link #currentMessageChannel} + * with the {@link LoggingHandler} subscriber for the {@code INFO} logging level, + * the {@code org.springframework.integration.handler.LoggingHandler} + * as a default logging category and {@link Function} for the log message. + *

A {@link #bridge()} is added after this operator to make the flow reply-producing + * if the {@code replyChannel} header is present. + *

This operator can be used only in the end of flow. + * @param function the function to evaluate logger message at runtime + * @param

the expected payload type. + * against the request {@link Message}. + * @return an {@link IntegrationFlow} instance based on this builder. + * @see #log() + * @see #bridge() + */ + public

IntegrationFlow logAndReply(Function, Object> function) { + Assert.notNull(function, "'function' must not be null"); + return logAndReply(new FunctionExpression<>(function)); + } + + /** + * Populate a {@link WireTap} for the {@link #currentMessageChannel} + * with the {@link LoggingHandler} subscriber for the {@code INFO} logging level, + * the {@code org.springframework.integration.handler.LoggingHandler} + * as a default logging category and SpEL expression to evaluate + * logger message at runtime against the request {@link Message}. + *

A {@link #bridge()} is added after this operator to make the flow reply-producing + * if the {@code replyChannel} header is present. + *

This operator can be used only in the end of flow. + * @param logExpression the {@link Expression} to evaluate logger message at runtime + * against the request {@link Message}. + * @return an {@link IntegrationFlow} instance based on this builder. + * @see #log() + * @see #bridge() + */ + public IntegrationFlow logAndReply(Expression logExpression) { + return logAndReply(LoggingHandler.Level.INFO, logExpression); + } + + /** + * Populate a {@link WireTap} for the {@link #currentMessageChannel} + * with the {@link LoggingHandler} subscriber for the provided + * {@link LoggingHandler.Level} logging level, + * the {@code org.springframework.integration.handler.LoggingHandler} + * as a default logging category and SpEL expression to evaluate + * logger message at runtime against the request {@link Message}. + *

A {@link #bridge()} is added after this operator to make the flow reply-producing + * if the {@code replyChannel} header is present. + *

This operator can be used only in the end of flow. + * @param level the {@link LoggingHandler.Level}. + * @param logExpression the {@link Expression} to evaluate logger message at runtime + * against the request {@link Message}. + * @return an {@link IntegrationFlow} instance based on this builder. + * @see #log() + * @see #bridge() + */ + public IntegrationFlow logAndReply(LoggingHandler.Level level, Expression logExpression) { + return logAndReply(level, null, logExpression); + } + + /** + * Populate a {@link WireTap} for the {@link #currentMessageChannel} + * with the {@link LoggingHandler} subscriber for the {@code INFO} + * {@link LoggingHandler.Level} logging level, + * the provided logging category and SpEL expression to evaluate + * logger message at runtime against the request {@link Message}. + *

A {@link #bridge()} is added after this operator to make the flow reply-producing + * if the {@code replyChannel} header is present. + *

This operator can be used only in the end of flow. + * @param category the logging category. + * @param logExpression the {@link Expression} to evaluate logger message at runtime + * against the request {@link Message}. + * @return an {@link IntegrationFlow} instance based on this builder. + * @see #log() + * @see #bridge() + */ + public IntegrationFlow logAndReply(String category, Expression logExpression) { + return logAndReply(LoggingHandler.Level.INFO, category, logExpression); + } + + /** + * Populate a {@link WireTap} for the {@link #currentMessageChannel} + * with the {@link LoggingHandler} subscriber for the provided + * {@link LoggingHandler.Level} logging level, + * the {@code org.springframework.integration.handler.LoggingHandler} + * as a default logging category and {@link Function} for the log message. + *

A {@link #bridge()} is added after this operator to make the flow reply-producing + * if the {@code replyChannel} header is present. + *

This operator can be used only in the end of flow. + * @param level the {@link LoggingHandler.Level}. + * @param function the function to evaluate logger message at runtime + * @param

the expected payload type. + * against the request {@link Message}. + * @return an {@link IntegrationFlow} instance based on this builder. + * @see #log() + * @see #bridge() + */ + public

IntegrationFlow logAndReply(LoggingHandler.Level level, Function, Object> function) { + return logAndReply(level, null, function); + } + + /** + * Populate a {@link WireTap} for the {@link #currentMessageChannel} + * with the {@link LoggingHandler} subscriber for the provided + * {@link LoggingHandler.Level} logging level, + * the provided logging category and {@link Function} for the log message. + *

A {@link #bridge()} is added after this operator to make the flow reply-producing + * if the {@code replyChannel} header is present. + *

This operator can be used only in the end of flow. + * @param category the logging category. + * @param function the function to evaluate logger message at runtime + * @param

the expected payload type. + * against the request {@link Message}. + * @return an {@link IntegrationFlow} instance based on this builder. + * @see #log() + * @see #bridge() + */ + public

IntegrationFlow logAndReply(String category, Function, Object> function) { + return logAndReply(LoggingHandler.Level.INFO, category, function); + } + + /** + * Populate a {@link WireTap} for the {@link #currentMessageChannel} + * with the {@link LoggingHandler} subscriber for the provided + * {@link LoggingHandler.Level} logging level, logging category + * and {@link Function} for the log message. + *

A {@link #bridge()} is added after this operator to make the flow reply-producing + * if the {@code replyChannel} header is present. + *

This operator can be used only in the end of flow. + * @param level the {@link LoggingHandler.Level}. + * @param category the logging category. + * @param function the function to evaluate logger message at runtime + * @param

the expected payload type. + * against the request {@link Message}. + * @return an {@link IntegrationFlow} instance based on this builder. + * @see #log() + * @see #bridge() + */ + public

IntegrationFlow logAndReply(LoggingHandler.Level level, String category, + Function, Object> function) { + + Assert.notNull(function, "'function' must not be null"); + return logAndReply(level, category, new FunctionExpression<>(function)); + } + + /** + * Populate a {@link WireTap} for the {@link #currentMessageChannel} + * with the {@link LoggingHandler} subscriber for the provided + * {@link LoggingHandler.Level} logging level, logging category + * and SpEL expression for the log message. + *

A {@link #bridge()} is added after this operator to make the flow reply-producing + * if the {@code replyChannel} header is present. + *

This operator can be used only in the end of flow. + * @param level the {@link LoggingHandler.Level}. + * @param category the logging category. + * @param logExpression the {@link Expression} to evaluate logger message at runtime + * against the request {@link Message}. + * @return an {@link IntegrationFlow} instance based on this builder. + * @see #log() + * @see #bridge() + */ + public IntegrationFlow logAndReply(LoggingHandler.Level level, String category, Expression logExpression) { + return log(level, category, logExpression) + .bridge() + .get(); + } + /** * Populate a {@link ScatterGatherHandler} to the current integration flow position * based on the provided {@link MessageChannel} for scattering function @@ -2605,6 +2898,7 @@ public Publisher> toReactivePublisher() { @SuppressWarnings("unchecked") private > B register(S endpointSpec, Consumer endpointConfigurer) { + if (endpointConfigurer != null) { endpointConfigurer.accept(endpointSpec); } @@ -2719,7 +3013,7 @@ protected StandardIntegrationFlow get() { if (this.currentMessageChannel instanceof FixedSubscriberChannelPrototype) { throw new BeanCreationException("The 'currentMessageChannel' (" + this.currentMessageChannel + ") is a prototype for 'FixedSubscriberChannel' which can't be created without 'MessageHandler' " - + "constructor argument. That means that '.fixedSubscriberChannel()' can't be the last " + + "constructor argument. That means that '.fixedSubscriberChannel()' can't be as the last " + "EIP-method in the 'IntegrationFlow' definition."); } @@ -2742,9 +3036,7 @@ else if (this.currentMessageChannel != null) { .stream() .reduce((first, second) -> second); if (lastComponent.get() instanceof WireTapSpec) { - enrichHeaders( - Collections.singletonMap( - MessageHeaders.REPLY_CHANNEL, IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME)); + channel(IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME); } } diff --git a/spring-integration-core/src/test/java/org/springframework/integration/dsl/flowservices/FlowServiceTests.java b/spring-integration-core/src/test/java/org/springframework/integration/dsl/flowservices/FlowServiceTests.java index cc66b118e6e..3226f9df8dd 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/dsl/flowservices/FlowServiceTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/dsl/flowservices/FlowServiceTests.java @@ -133,7 +133,7 @@ public static class ContextConfiguration { @Bean public IntegrationFlow testGateway() { return f -> f.gateway("processChannel", g -> g.replyChannel("replyChannel")) - .log(); + .logAndReply(); } @Bean diff --git a/spring-integration-core/src/test/java/org/springframework/integration/dsl/transformers/TransformerTests.java b/spring-integration-core/src/test/java/org/springframework/integration/dsl/transformers/TransformerTests.java index 4f8e268dc08..8704e024977 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/dsl/transformers/TransformerTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/dsl/transformers/TransformerTests.java @@ -266,7 +266,7 @@ public IntegrationFlow enricherFlow() { .propertyFunction("date", m -> new Date()) .headerExpression("foo", "payload['name']") ) - .get(); + .logAndReply(); } @Bean diff --git a/src/reference/asciidoc/dsl.adoc b/src/reference/asciidoc/dsl.adoc index 3880b942ce3..713575eb3cb 100644 --- a/src/reference/asciidoc/dsl.adoc +++ b/src/reference/asciidoc/dsl.adoc @@ -550,6 +550,10 @@ The following example shows how to use `LoggingHandler`: In the preceding example, an `id` header is logged at the `ERROR` level onto `test.category` only for messages that passed the filter and before routing. +When this operator is used in the end of flow, it is considered as a one-way handler and and treated as the end of flow. +To make it as a reply-producing flow, you need to use a simple `bridge()` after or just use introduced in version 5.1 a `logAndReply()` operator. +This one can be used only in the end of flow. + [[java-dsl-wiretap]] === `MessageChannelSpec.wireTap()` diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index d28ee8ad50a..bd086e15e5c 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -36,6 +36,8 @@ The following changes have been made in version 5.1: The `IntegrationFlowContext` is now an interface and `IntegrationFlowRegistration` is an inner interface of `IntegrationFlowContext`. +A new `logAndReply()` operator has been introduced for convenience in the end of flow for request-reply configurations to avoid confusion with the `log()` which is treated as one-way end flow component. + [[x5.1-dispatcher-exceptions]] ==== Dispatcher Exceptions From ebd12410fa6233a9300166eda36c1a6ec5945366 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Mon, 6 Aug 2018 15:28:13 -0400 Subject: [PATCH 4/4] * Fix logging message --- .../integration/dsl/IntegrationFlowDefinition.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java index 957ab4725ab..a0cc473920b 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/IntegrationFlowDefinition.java @@ -3013,7 +3013,7 @@ protected StandardIntegrationFlow get() { if (this.currentMessageChannel instanceof FixedSubscriberChannelPrototype) { throw new BeanCreationException("The 'currentMessageChannel' (" + this.currentMessageChannel + ") is a prototype for 'FixedSubscriberChannel' which can't be created without 'MessageHandler' " - + "constructor argument. That means that '.fixedSubscriberChannel()' can't be as the last " + + "constructor argument. That means that '.fixedSubscriberChannel()' can't be the last " + "EIP-method in the 'IntegrationFlow' definition."); }