diff --git a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java index bfdbe677de8..f8e755cf68d 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java @@ -16,7 +16,9 @@ package org.springframework.integration.endpoint; +import java.time.Duration; import java.util.Collection; +import java.util.Date; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.Executor; @@ -24,6 +26,7 @@ import java.util.stream.Collectors; import org.aopalliance.aop.Advice; +import org.reactivestreams.Subscription; import org.springframework.aop.framework.ProxyFactory; import org.springframework.beans.factory.BeanClassLoaderAware; @@ -43,6 +46,7 @@ import org.springframework.messaging.MessagingException; import org.springframework.scheduling.Trigger; import org.springframework.scheduling.support.PeriodicTrigger; +import org.springframework.scheduling.support.SimpleTriggerContext; import org.springframework.transaction.interceptor.TransactionInterceptor; import org.springframework.transaction.support.TransactionSynchronization; import org.springframework.transaction.support.TransactionSynchronizationManager; @@ -51,6 +55,10 @@ import org.springframework.util.CollectionUtils; import org.springframework.util.ErrorHandler; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; + /** * @author Mark Fisher * @author Oleg Zhurakousky @@ -66,23 +74,27 @@ public abstract class AbstractPollingEndpoint extends AbstractEndpoint implement private boolean syncExecutor = true; + private ClassLoader beanClassLoader = ClassUtils.getDefaultClassLoader(); + + private Trigger trigger = new PeriodicTrigger(10); + + private long maxMessagesPerPoll = -1; + private ErrorHandler errorHandler; private boolean errorHandlerIsDefault; - private Trigger trigger = new PeriodicTrigger(10); - private List adviceChain; - private ClassLoader beanClassLoader = ClassUtils.getDefaultClassLoader(); + private TransactionSynchronizationFactory transactionSynchronizationFactory; - private long maxMessagesPerPoll = -1; + private volatile Callable> pollingTask; - private TransactionSynchronizationFactory transactionSynchronizationFactory; + private volatile Flux> pollingFlux; - private volatile ScheduledFuture runningTask; + private volatile Subscription subscription; - private volatile Runnable poller; + private volatile ScheduledFuture runningTask; private volatile boolean initialized; @@ -167,6 +179,14 @@ protected boolean isReceiveOnlyAdvice(Advice advice) { protected void applyReceiveOnlyAdviceChain(Collection chain) { } + protected boolean isReactive() { + return false; + } + + protected Flux> getPollingFlux() { + return this.pollingFlux; + } + @Override protected void onInit() { synchronized (this.initializationMonitor) { @@ -200,8 +220,30 @@ protected void onInit() { } } + // LifecycleSupport implementation + + @Override // guarded by super#lifecycleLock + protected void doStart() { + if (!this.initialized) { + onInit(); + } + + this.pollingTask = createPollingTask(); + + if (isReactive()) { + this.pollingFlux = createFluxGenerator(); + } + else { + Assert.state(getTaskScheduler() != null, "unable to start polling, no taskScheduler available"); + + this.runningTask = + getTaskScheduler() + .schedule(createPoller(), this.trigger); + } + } + @SuppressWarnings("unchecked") - private Runnable createPoller() throws Exception { + private Callable> createPollingTask() { List receiveOnlyAdviceChain = null; if (!CollectionUtils.isEmpty(this.adviceChain)) { receiveOnlyAdviceChain = this.adviceChain.stream() @@ -209,7 +251,7 @@ private Runnable createPoller() throws Exception { .collect(Collectors.toList()); } - Callable pollingTask = this::doPoll; + Callable> pollingTask = this::doPoll; List adviceChain = this.adviceChain; if (!CollectionUtils.isEmpty(adviceChain)) { @@ -219,65 +261,122 @@ private Runnable createPoller() throws Exception { .filter(advice -> !isReceiveOnlyAdvice(advice)) .forEach(proxyFactory::addAdvice); } - pollingTask = (Callable) proxyFactory.getProxy(this.beanClassLoader); + pollingTask = (Callable>) proxyFactory.getProxy(this.beanClassLoader); } if (!CollectionUtils.isEmpty(receiveOnlyAdviceChain)) { applyReceiveOnlyAdviceChain(receiveOnlyAdviceChain); } - return new Poller(pollingTask); + + return pollingTask; } - // LifecycleSupport implementation + private Runnable createPoller() { + return () -> + this.taskExecutor.execute(() -> { + int count = 0; + while (this.initialized && (this.maxMessagesPerPoll <= 0 || count < this.maxMessagesPerPoll)) { + if (pollForMessage() == null) { + break; + } + count++; + } + }); + } - @Override // guarded by super#lifecycleLock - protected void doStart() { - if (!this.initialized) { - this.onInit(); - } - Assert.state(this.getTaskScheduler() != null, - "unable to start polling, no taskScheduler available"); + private Flux> createFluxGenerator() { + SimpleTriggerContext triggerContext = new SimpleTriggerContext(); + + return Flux + .generate(sink -> { + Date date = this.trigger.nextExecutionTime(triggerContext); + if (date != null) { + triggerContext.update(date, null, null); + long millis = date.getTime() - System.currentTimeMillis(); + sink.next(Duration.ofMillis(millis)); + } + else { + sink.complete(); + } + }) + .concatMap(duration -> + Mono.delay(duration) + .doOnNext(l -> + triggerContext.update(triggerContext.lastScheduledExecutionTime(), + new Date(), null)) + .flatMapMany(l -> + Flux + .>generate(fluxSink -> { + Message message = pollForMessage(); + if (message != null) { + fluxSink.next(message); + } + else { + fluxSink.complete(); + } + }) + .take(this.maxMessagesPerPoll) + .subscribeOn(Schedulers.fromExecutor(this.taskExecutor)) + .doOnComplete(() -> + triggerContext.update(triggerContext.lastScheduledExecutionTime(), + triggerContext.lastActualExecutionTime(), + new Date()) + )), 1) + .repeat(this::isRunning) + .doOnSubscribe(subscription -> this.subscription = subscription); + } + + private Message pollForMessage() { try { - this.poller = createPoller(); + return this.pollingTask.call(); } catch (Exception e) { - this.initialized = false; - throw new MessagingException("Failed to create Poller", e); + if (e instanceof MessagingException) { + throw (MessagingException) e; + } + else { + Message failedMessage = null; + if (this.transactionSynchronizationFactory != null) { + Object resource = TransactionSynchronizationManager.getResource(getResourceToBind()); + if (resource instanceof IntegrationResourceHolder) { + failedMessage = ((IntegrationResourceHolder) resource).getMessage(); + } + } + throw new MessagingException(failedMessage, e); + } } - this.runningTask = this.getTaskScheduler().schedule(this.poller, this.trigger); - } - - @Override // guarded by super#lifecycleLock - protected void doStop() { - if (this.runningTask != null) { - this.runningTask.cancel(true); + finally { + if (this.transactionSynchronizationFactory != null) { + Object resource = getResourceToBind(); + if (TransactionSynchronizationManager.hasResource(resource)) { + TransactionSynchronizationManager.unbindResource(resource); + } + } } - this.runningTask = null; } - private boolean doPoll() { - IntegrationResourceHolder holder = this.bindResourceHolderIfNecessary( - this.getResourceKey(), this.getResourceToBind()); - Message message = null; + private Message doPoll() { + IntegrationResourceHolder holder = bindResourceHolderIfNecessary(getResourceKey(), getResourceToBind()); + Message message; try { - message = this.receiveMessage(); + message = receiveMessage(); } catch (Exception e) { if (Thread.interrupted()) { if (logger.isDebugEnabled()) { logger.debug("Poll interrupted - during stop()? : " + e.getMessage()); } - return false; + return null; } else { throw (RuntimeException) e; } } - boolean result; + if (message == null) { if (this.logger.isDebugEnabled()) { this.logger.debug("Received no Message during the poll, returning 'false'"); } - result = false; + return null; } else { if (this.logger.isDebugEnabled()) { @@ -286,20 +385,35 @@ private boolean doPoll() { if (holder != null) { holder.setMessage(message); } - try { - this.handleMessage(message); - } - catch (Exception e) { - if (e instanceof MessagingException) { - throw new MessagingExceptionWrapper(message, (MessagingException) e); + + if (!isReactive()) { + try { + handleMessage(message); } - else { - throw new MessagingException(message, e); + catch (Exception e) { + if (e instanceof MessagingException) { + throw new MessagingExceptionWrapper(message, (MessagingException) e); + } + else { + throw new MessagingException(message, e); + } } } - result = true; } - return result; + + return message; + } + + @Override // guarded by super#lifecycleLock + protected void doStop() { + if (this.runningTask != null) { + this.runningTask.cancel(true); + } + this.runningTask = null; + + if (this.subscription != null) { + this.subscription.cancel(); + } } /** @@ -369,57 +483,4 @@ private IntegrationResourceHolder bindResourceHolderIfNecessary(String key, Obje return null; } - /** - * Default Poller implementation - */ - private final class Poller implements Runnable { - - private final Callable pollingTask; - - Poller(Callable pollingTask) { - this.pollingTask = pollingTask; - } - - @Override - public void run() { - AbstractPollingEndpoint.this.taskExecutor.execute(() -> { - int count = 0; - while (AbstractPollingEndpoint.this.initialized - && (AbstractPollingEndpoint.this.maxMessagesPerPoll <= 0 - || count < AbstractPollingEndpoint.this.maxMessagesPerPoll)) { - try { - if (!Poller.this.pollingTask.call()) { - break; - } - count++; - } - catch (Exception e) { - if (e instanceof MessagingException) { - throw (MessagingException) e; - } - else { - Message failedMessage = null; - if (AbstractPollingEndpoint.this.transactionSynchronizationFactory != null) { - Object resource = TransactionSynchronizationManager.getResource(getResourceToBind()); - if (resource instanceof IntegrationResourceHolder) { - failedMessage = ((IntegrationResourceHolder) resource).getMessage(); - } - } - throw new MessagingException(failedMessage, e); - } - } - finally { - if (AbstractPollingEndpoint.this.transactionSynchronizationFactory != null) { - Object resource = getResourceToBind(); - if (TransactionSynchronizationManager.hasResource(resource)) { - TransactionSynchronizationManager.unbindResource(resource); - } - } - } - } - }); - } - - } - } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/PollingConsumer.java b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/PollingConsumer.java index 3ca5e8b62df..dfb1d462d2a 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/PollingConsumer.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/PollingConsumer.java @@ -21,8 +21,11 @@ import java.util.Iterator; import java.util.List; +import org.reactivestreams.Subscriber; + import org.springframework.context.Lifecycle; import org.springframework.integration.channel.ExecutorChannelInterceptorAware; +import org.springframework.integration.channel.ReactiveStreamsSubscribableChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.router.MessageRouter; import org.springframework.integration.support.utils.IntegrationUtils; @@ -97,6 +100,12 @@ public MessageHandler getHandler() { return this.handler; } + @Override + protected boolean isReactive() { + return getOutputChannel() instanceof ReactiveStreamsSubscribableChannel && + this.handler instanceof Subscriber; + } + @Override protected void doStart() { if (this.handler instanceof Lifecycle) { diff --git a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/SourcePollingChannelAdapter.java b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/SourcePollingChannelAdapter.java index 8b278c6c709..9bee1279fa2 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/endpoint/SourcePollingChannelAdapter.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/endpoint/SourcePollingChannelAdapter.java @@ -31,6 +31,7 @@ import org.springframework.integration.acks.AckUtils; import org.springframework.integration.acks.AcknowledgmentCallback; import org.springframework.integration.aop.MessageSourceMutator; +import org.springframework.integration.channel.ReactiveStreamsSubscribableChannel; import org.springframework.integration.context.ExpressionCapable; import org.springframework.integration.core.MessageSource; import org.springframework.integration.core.MessagingTemplate; @@ -169,6 +170,11 @@ protected void applyReceiveOnlyAdviceChain(Collection chain) { } } + @Override + protected boolean isReactive() { + return getOutputChannel() instanceof ReactiveStreamsSubscribableChannel; + } + private NameMatchMethodPointcutAdvisor adviceToReceiveAdvisor(Advice advice) { NameMatchMethodPointcutAdvisor sourceAdvisor = new NameMatchMethodPointcutAdvisor(advice); sourceAdvisor.addMethodName("receive"); @@ -181,6 +187,10 @@ protected void doStart() { ((Lifecycle) this.source).start(); } super.doStart(); + + if (isReactive()) { + ((ReactiveStreamsSubscribableChannel) this.outputChannel).subscribeTo(getPollingFlux()); + } } @@ -197,7 +207,7 @@ protected void doStop() { protected void onInit() { Assert.notNull(this.source, "source must not be null"); Assert.state((this.outputChannelName == null && this.outputChannel != null) - || (this.outputChannelName != null && this.outputChannel == null), + || (this.outputChannelName != null && this.outputChannel == null), "One and only one of 'outputChannelName' or 'outputChannel' is required."); super.onInit(); if (this.getBeanFactory() != null) { diff --git a/spring-integration-core/src/test/java/org/springframework/integration/configuration/EnableIntegrationTests.java b/spring-integration-core/src/test/java/org/springframework/integration/configuration/EnableIntegrationTests.java index 4394789cf04..df6f963dd70 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/configuration/EnableIntegrationTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/configuration/EnableIntegrationTests.java @@ -523,7 +523,7 @@ public void testMetaAnnotations() { assertTrue(TestUtils.getPropertyValue(consumer, "autoStartup", Boolean.class)); assertEquals(23, TestUtils.getPropertyValue(consumer, "phase")); assertSame(context.getBean("annInput1"), TestUtils.getPropertyValue(consumer, "inputChannel")); - assertEquals("annOutput", TestUtils.getPropertyValue(consumer, "handler.outputChannelName")); + assertEquals("annOutput", TestUtils.getPropertyValue(consumer, "handler.outputChannel.beanName")); assertSame(context.getBean("annAdvice1"), TestUtils.getPropertyValue(consumer, "handler.adviceChain", List.class).get(0)); assertEquals(2000L, TestUtils.getPropertyValue(consumer, "trigger.period")); diff --git a/spring-integration-core/src/test/java/org/springframework/integration/dispatcher/PollingTransactionTests.java b/spring-integration-core/src/test/java/org/springframework/integration/dispatcher/PollingTransactionTests.java index 252a101d653..0918d461409 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/dispatcher/PollingTransactionTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/dispatcher/PollingTransactionTests.java @@ -89,8 +89,7 @@ public void transactionWithCommitAndAdvices() throws InterruptedException { List adviceChain = TestUtils.getPropertyValue(advisedPoller, "adviceChain", List.class); assertEquals(4, adviceChain.size()); advisedPoller.start(); - Runnable poller = TestUtils.getPropertyValue(advisedPoller, "poller", Runnable.class); - Callable pollingTask = TestUtils.getPropertyValue(poller, "pollingTask", Callable.class); + Callable pollingTask = TestUtils.getPropertyValue(advisedPoller, "pollingTask", Callable.class); assertTrue("Poller is not Advised", pollingTask instanceof Advised); Advisor[] advisors = ((Advised) pollingTask).getAdvisors(); assertEquals(4, advisors.length); diff --git a/spring-integration-core/src/test/java/org/springframework/integration/endpoint/ReactiveInboundChannelAdapterTests.java b/spring-integration-core/src/test/java/org/springframework/integration/endpoint/ReactiveInboundChannelAdapterTests.java new file mode 100644 index 00000000000..96c5be0eb95 --- /dev/null +++ b/spring-integration-core/src/test/java/org/springframework/integration/endpoint/ReactiveInboundChannelAdapterTests.java @@ -0,0 +1,97 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.endpoint; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; + +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.task.SimpleAsyncTaskExecutor; +import org.springframework.core.task.TaskExecutor; +import org.springframework.integration.annotation.InboundChannelAdapter; +import org.springframework.integration.annotation.Poller; +import org.springframework.integration.channel.FluxMessageChannel; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; + +/** + * @author Artem Bilan + * + * @since 5.1 + */ +@SpringJUnitConfig +@DirtiesContext +public class ReactiveInboundChannelAdapterTests { + + @Autowired + private FluxMessageChannel fluxMessageChannel; + + @Test + public void testReactiveInboundChannelAdapter() { + Flux testFlux = + Flux.from(this.fluxMessageChannel) + .map(Message::getPayload) + .cast(Integer.class); + + StepVerifier.create(testFlux) + .expectNext(2, 4, 6, 8, 10, 12, 14, 16) + .thenCancel() + .verify(); + } + + @Configuration + @EnableIntegration + public static class Config { + + @Bean + public AtomicInteger counter() { + return new AtomicInteger(); + } + + @Bean + public TaskExecutor taskExecutor() { + return new SimpleAsyncTaskExecutor(); + } + + @Bean + @InboundChannelAdapter(value = "fluxChannel", + poller = @Poller(fixedDelay = "100", maxMessagesPerPoll = "3", taskExecutor = "taskExecutor")) + public Supplier counterMessageSupplier() { + return () -> { + int i = counter().incrementAndGet(); + return i % 2 == 0 ? i : null; + }; + } + + @Bean + public MessageChannel fluxChannel() { + return new FluxMessageChannel(); + } + + } + +} diff --git a/spring-integration-core/src/test/java/org/springframework/integration/gateway/GatewayProxyFactoryBeanTests.java b/spring-integration-core/src/test/java/org/springframework/integration/gateway/GatewayProxyFactoryBeanTests.java index 7df66ee3494..2ee0c8937c3 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/gateway/GatewayProxyFactoryBeanTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/gateway/GatewayProxyFactoryBeanTests.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -46,7 +47,6 @@ import org.springframework.core.convert.converter.Converter; import org.springframework.core.convert.support.DefaultConversionService; import org.springframework.core.convert.support.GenericConversionService; -import org.springframework.expression.Expression; import org.springframework.expression.common.LiteralExpression; import org.springframework.integration.annotation.Gateway; import org.springframework.integration.annotation.GatewayHeader; @@ -68,6 +68,7 @@ * @author Oleg Zhurakousky * @author Gunnar Hillert * @author Gary Russell + * @author Artem Bilan */ public class GatewayProxyFactoryBeanTests { @@ -91,13 +92,16 @@ public void testRequestReplyWithAnonymousChannelConvertedTypeViaConversionServic QueueChannel requestChannel = new QueueChannel(); startResponder(requestChannel); GenericConversionService cs = new DefaultConversionService(); - Converter stringToByteConverter = new Converter() { + Converter stringToByteConverter = + // Has to an interface (not lambda) to honor Mockito + new Converter() { - @Override - public byte[] convert(String source) { - return source.getBytes(); - } - }; + @Override + public byte[] convert(String source) { + return source.getBytes(); + } + + }; stringToByteConverter = spy(stringToByteConverter); cs.addConverter(stringToByteConverter); GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean(); @@ -134,7 +138,7 @@ public void testOneWay() throws Exception { @Test public void testSolicitResponse() throws Exception { QueueChannel replyChannel = new QueueChannel(); - replyChannel.send(new GenericMessage("foo")); + replyChannel.send(new GenericMessage<>("foo")); GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean(); proxyFactory.setServiceInterface(TestService.class); proxyFactory.setDefaultRequestChannel(new DirectChannel()); @@ -169,7 +173,7 @@ public void testRequestReplyWithTypeConversion() throws Exception { final QueueChannel requestChannel = new QueueChannel(); new Thread(() -> { Message input = requestChannel.receive(); - GenericMessage reply = new GenericMessage(input.getPayload() + "456"); + GenericMessage reply = new GenericMessage<>(input.getPayload() + "456"); ((MessageChannel) input.getHeaders().getReplyChannel()).send(reply); }).start(); GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean(); @@ -229,7 +233,7 @@ public void testMultipleMessagesWithResponseCorrelator() throws InterruptedExcep latch.countDown(); }); } - latch.await(30, TimeUnit.SECONDS); + assertTrue(latch.await(30, TimeUnit.SECONDS)); for (int i = 0; i < numRequests; i++) { assertEquals("test-" + i + "!!!", results[i]); } @@ -251,7 +255,7 @@ public void testMessageAsMethodArgument() throws Exception { proxyFactory.setBeanFactory(mock(BeanFactory.class)); proxyFactory.afterPropertiesSet(); TestService service = (TestService) proxyFactory.getObject(); - String result = service.requestReplyWithMessageParameter(new GenericMessage("foo")); + String result = service.requestReplyWithMessageParameter(new GenericMessage<>("foo")); assertEquals("foobar", result); } @@ -275,7 +279,7 @@ public void testMessageAsReturnValue() throws Exception { final QueueChannel requestChannel = new QueueChannel(); new Thread(() -> { Message input = requestChannel.receive(); - GenericMessage reply = new GenericMessage(input.getPayload() + "bar"); + GenericMessage reply = new GenericMessage<>(input.getPayload() + "bar"); ((MessageChannel) input.getHeaders().getReplyChannel()).send(reply); }).start(); GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean(); @@ -345,7 +349,7 @@ public void handleMessage(Message message) { private static void startResponder(final PollableChannel requestChannel) { new Thread(() -> { Message input = requestChannel.receive(); - GenericMessage reply = new GenericMessage(input.getPayload() + "bar"); + GenericMessage reply = new GenericMessage<>(input.getPayload() + "bar"); ((MessageChannel) input.getHeaders().getReplyChannel()).send(reply); }).start(); } @@ -359,7 +363,7 @@ public void testProgrammaticWiring() throws Exception { gpfb.setDefaultRequestChannel(drc); gpfb.setDefaultReplyTimeout(0L); GatewayMethodMetadata meta = new GatewayMethodMetadata(); - meta.setHeaderExpressions(Collections.singletonMap("foo", new LiteralExpression("bar"))); + meta.setHeaderExpressions(Collections.singletonMap("foo", new LiteralExpression("bar"))); gpfb.setGlobalMethodMetadata(meta); gpfb.afterPropertiesSet(); ((TestEchoService) gpfb.getObject()).echo("foo"); @@ -385,7 +389,8 @@ public void testIdHeaderOverrideHeaderExpression() { } catch (Exception e) { assertThat(e, instanceOf(BeanInitializationException.class)); - assertThat(e.getMessage(), containsString("Messaging Gateway cannot override 'id' and 'timestamp' read-only headers")); + assertThat(e + .getMessage(), containsString("Messaging Gateway cannot override 'id' and 'timestamp' read-only headers")); } } @@ -401,7 +406,8 @@ public void testIdHeaderOverrideGatewayHeaderAnnotation() { } catch (Exception e) { assertThat(e, instanceOf(BeanInitializationException.class)); - assertThat(e.getMessage(), containsString("Messaging Gateway cannot override 'id' and 'timestamp' read-only headers")); + assertThat(e + .getMessage(), containsString("Messaging Gateway cannot override 'id' and 'timestamp' read-only headers")); } } @@ -417,7 +423,8 @@ public void testTimeStampHeaderOverrideParamHeaderAnnotation() { } catch (Exception e) { assertThat(e, instanceOf(BeanInitializationException.class)); - assertThat(e.getMessage(), containsString("Messaging Gateway cannot override 'id' and 'timestamp' read-only headers")); + assertThat(e + .getMessage(), containsString("Messaging Gateway cannot override 'id' and 'timestamp' read-only headers")); } } @@ -480,6 +487,7 @@ public static void throwTestException() throws TestException { interface TestEchoService { Message echo(String s); + } @@ -487,17 +495,20 @@ interface HeadersOverwriteService { @Gateway(headers = @GatewayHeader(name = MessageHeaders.ID, value = "id")) Message echo(String s); + } interface HeadersParamService { Message echo(String s, @Header(MessageHeaders.TIMESTAMP) String foo); + } interface TestExceptionThrowingInterface { String throwCheckedException(String s) throws TestException; + } interface InheritSuper { @@ -530,6 +541,7 @@ public static class TestClient { public TestClient(TestService service) { this.service = service; } + } } diff --git a/spring-integration-core/src/test/java/org/springframework/integration/handler/advice/AdvisedMessageHandlerTests.java b/spring-integration-core/src/test/java/org/springframework/integration/handler/advice/AdvisedMessageHandlerTests.java index d4b5a9ba348..691c9f090fd 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/handler/advice/AdvisedMessageHandlerTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/handler/advice/AdvisedMessageHandlerTests.java @@ -99,7 +99,7 @@ public class AdvisedMessageHandlerTests { @Test public void circuitBreakerExceptionText() { - GenericMessage message = new GenericMessage("foo"); + GenericMessage message = new GenericMessage<>("foo"); try { input.send(message); fail("expected exception"); @@ -353,7 +353,7 @@ protected Object handleRequestMessage(Message requestMessage) { @Test @SuppressWarnings("rawtypes") - public void circuitBreakerTests() throws Exception { + public void circuitBreakerTests() { final AtomicBoolean doFail = new AtomicBoolean(); AbstractReplyProducingMessageHandler handler = new AbstractReplyProducingMessageHandler() { @@ -384,7 +384,7 @@ protected Object handleRequestMessage(Message requestMessage) { handler.afterPropertiesSet(); doFail.set(true); - Message message = new GenericMessage("Hello, world!"); + Message message = new GenericMessage<>("Hello, world!"); try { handler.handleMessage(message); fail("Expected failure"); @@ -483,7 +483,7 @@ protected Object handleRequestMessage(Message requestMessage) { handler.setBeanFactory(mock(BeanFactory.class)); handler.afterPropertiesSet(); - Message message = new GenericMessage("Hello, world!"); + Message message = new GenericMessage<>("Hello, world!"); handler.handleMessage(message); assertTrue(counter.get() == -1); Message reply = replies.receive(10000); @@ -511,13 +511,13 @@ protected Object handleRequestMessage(Message requestMessage) { advice.setRetryStateGenerator(message -> new DefaultRetryState(message.getHeaders().getId())); - List adviceChain = new ArrayList(); + List adviceChain = new ArrayList<>(); adviceChain.add(advice); handler.setAdviceChain(adviceChain); handler.setBeanFactory(mock(BeanFactory.class)); handler.afterPropertiesSet(); - Message message = new GenericMessage("Hello, world!"); + Message message = new GenericMessage<>("Hello, world!"); for (int i = 0; i < 3; i++) { try { handler.handleMessage(message); @@ -583,13 +583,13 @@ private void defaultStatefulRetryRecoverAfterThirdTryGuts(final AtomicInteger co AbstractReplyProducingMessageHandler handler, QueueChannel replies, RequestHandlerRetryAdvice advice) { advice.setRecoveryCallback(context -> "baz"); - List adviceChain = new ArrayList(); + List adviceChain = new ArrayList<>(); adviceChain.add(advice); handler.setAdviceChain(adviceChain); handler.setBeanFactory(mock(BeanFactory.class)); handler.afterPropertiesSet(); - Message message = new GenericMessage("Hello, world!"); + Message message = new GenericMessage<>("Hello, world!"); for (int i = 0; i < 4; i++) { try { handler.handleMessage(message); @@ -617,13 +617,13 @@ protected Object handleRequestMessage(Message requestMessage) { ErrorMessageSendingRecoverer recoverer = new ErrorMessageSendingRecoverer(errors); advice.setRecoveryCallback(recoverer); - List adviceChain = new ArrayList(); + List adviceChain = new ArrayList<>(); adviceChain.add(advice); handler.setAdviceChain(adviceChain); handler.setBeanFactory(mock(BeanFactory.class)); handler.afterPropertiesSet(); - Message message = new GenericMessage("Hello, world!"); + Message message = new GenericMessage<>("Hello, world!"); handler.handleMessage(message); Message error = errors.receive(10000); assertNotNull(error); @@ -658,13 +658,13 @@ public boolean canRetry(RetryContext context) { advice.setBeanFactory(mock(BeanFactory.class)); advice.afterPropertiesSet(); - List adviceChain = new ArrayList(); + List adviceChain = new ArrayList<>(); adviceChain.add(advice); handler.setAdviceChain(adviceChain); handler.setBeanFactory(mock(BeanFactory.class)); handler.afterPropertiesSet(); - Message message = new GenericMessage("Hello, world!"); + Message message = new GenericMessage<>("Hello, world!"); handler.handleMessage(message); Message error = errors.receive(10000); assertNotNull(error); @@ -685,7 +685,7 @@ protected Object handleRequestMessage(Message requestMessage) { } }; - List adviceChain = new ArrayList(); + List adviceChain = new ArrayList<>(); adviceChain.add(new RequestHandlerRetryAdvice()); adviceChain.add((MethodInterceptor) invocation -> { @@ -698,7 +698,7 @@ protected Object handleRequestMessage(Message requestMessage) { handler.afterPropertiesSet(); try { - handler.handleMessage(new GenericMessage("test")); + handler.handleMessage(new GenericMessage<>("test")); } catch (Exception e) { Throwable cause = e.getCause(); @@ -724,7 +724,7 @@ protected Object handleRequestMessage(Message requestMessage) { QueueChannel replies = new QueueChannel(); handler.setOutputChannel(replies); - List adviceChain = new ArrayList(); + List adviceChain = new ArrayList<>(); ExpressionEvaluatingRequestHandlerAdvice expressionAdvice = new ExpressionEvaluatingRequestHandlerAdvice(); expressionAdvice.setBeanFactory(mock(BeanFactory.class)); @@ -750,7 +750,7 @@ protected Object doInvoke(ExecutionCallback callback, Object target, Message handler.setBeanFactory(mock(BeanFactory.class)); handler.afterPropertiesSet(); - handler.handleMessage(new GenericMessage("test")); + handler.handleMessage(new GenericMessage<>("test")); Message receive = replies.receive(10000); assertNotNull(receive); assertEquals("intentional: 3", receive.getPayload()); @@ -772,7 +772,7 @@ protected Object handleRequestMessage(Message requestMessage) { QueueChannel errors = new QueueChannel(); - List adviceChain = new ArrayList(); + List adviceChain = new ArrayList<>(); ExpressionEvaluatingRequestHandlerAdvice expressionAdvice = new ExpressionEvaluatingRequestHandlerAdvice(); expressionAdvice.setBeanFactory(mock(BeanFactory.class)); @@ -790,7 +790,7 @@ protected Object handleRequestMessage(Message requestMessage) { handler.afterPropertiesSet(); try { - handler.handleMessage(new GenericMessage("test")); + handler.handleMessage(new GenericMessage<>("test")); } catch (Exception e) { assertEquals("intentional: 3", e.getCause().getMessage()); @@ -833,7 +833,7 @@ protected Object doInvoke(ExecutionCallback callback, Object target, Message Method method = AbstractReplyProducingMessageHandler.class.getDeclaredMethod("handleRequestMessage", Message.class); when(methodInvocation.getMethod()).thenReturn(method); - when(methodInvocation.getArguments()).thenReturn(new Object[] { new GenericMessage("foo") }); + when(methodInvocation.getArguments()).thenReturn(new Object[] { new GenericMessage<>("foo") }); try { doAnswer(invocation -> { throw theThrowable; @@ -851,7 +851,7 @@ protected Object doInvoke(ExecutionCallback callback, Object target, Message * ThrowableHolderException from the output message. */ @Test - public void throwableProperlyPropagatedAndReported() throws Exception { + public void throwableProperlyPropagatedAndReported() { QueueChannel errors = new QueueChannel(); ExpressionEvaluatingRequestHandlerAdvice expressionAdvice = new ExpressionEvaluatingRequestHandlerAdvice(); @@ -866,7 +866,7 @@ public void throwableProperlyPropagatedAndReported() throws Exception { Bar fooHandler = (Bar) proxyFactory.getProxy(); try { - fooHandler.handleRequestMessage(new GenericMessage("foo")); + fooHandler.handleRequestMessage(new GenericMessage<>("foo")); fail("Expected throwable"); } catch (Throwable t) { @@ -898,12 +898,12 @@ protected Object doInvoke(ExecutionCallback callback, Object target, Message consumer.setTaskScheduler(mock(TaskScheduler.class)); consumer.start(); - Callable pollingTask = TestUtils.getPropertyValue(consumer, "poller.pollingTask", Callable.class); + Callable pollingTask = TestUtils.getPropertyValue(consumer, "pollingTask", Callable.class); assertTrue(AopUtils.isAopProxy(pollingTask)); Log logger = TestUtils.getPropertyValue(advice, "logger", Log.class); logger = spy(logger); when(logger.isWarnEnabled()).thenReturn(Boolean.TRUE); - final AtomicReference logMessage = new AtomicReference(); + final AtomicReference logMessage = new AtomicReference<>(); doAnswer(invocation -> { logMessage.set(invocation.getArgument(0)); return null; @@ -925,7 +925,7 @@ public void filterDiscardNoAdvice() { MessageFilter filter = new MessageFilter(message -> false); QueueChannel discardChannel = new QueueChannel(); filter.setDiscardChannel(discardChannel); - filter.handleMessage(new GenericMessage("foo")); + filter.handleMessage(new GenericMessage<>("foo")); assertNotNull(discardChannel.receive(0)); } @@ -948,7 +948,7 @@ protected Object doInvoke(ExecutionCallback callback, Object target, Message filter.setAdviceChain(adviceChain); filter.setBeanFactory(mock(BeanFactory.class)); filter.afterPropertiesSet(); - filter.handleMessage(new GenericMessage("foo")); + filter.handleMessage(new GenericMessage<>("foo")); assertNotNull(discardedWithinAdvice.get()); assertNull(discardChannel.receive(0)); } @@ -958,7 +958,7 @@ public void filterDiscardOutsideAdvice() { MessageFilter filter = new MessageFilter(message -> false); final QueueChannel discardChannel = new QueueChannel(); filter.setDiscardChannel(discardChannel); - List adviceChain = new ArrayList(); + List adviceChain = new ArrayList<>(); final AtomicReference> discardedWithinAdvice = new AtomicReference>(); final AtomicBoolean adviceCalled = new AtomicBoolean(); adviceChain.add(new AbstractRequestHandlerAdvice() { @@ -975,7 +975,7 @@ protected Object doInvoke(ExecutionCallback callback, Object target, Message filter.setDiscardWithinAdvice(false); filter.setBeanFactory(mock(BeanFactory.class)); filter.afterPropertiesSet(); - filter.handleMessage(new GenericMessage("foo")); + filter.handleMessage(new GenericMessage<>("foo")); assertTrue(adviceCalled.get()); assertNull(discardedWithinAdvice.get()); assertNotNull(discardChannel.receive(0)); @@ -1022,13 +1022,13 @@ protected Object handleRequestMessage(Message requestMessage) { advice.setRetryTemplate(retryTemplate); - List adviceChain = new ArrayList(); + List adviceChain = new ArrayList<>(); adviceChain.add(advice); handler.setAdviceChain(adviceChain); handler.setBeanFactory(mock(BeanFactory.class)); handler.afterPropertiesSet(); - Message message = new GenericMessage("Hello, world!"); + Message message = new GenericMessage<>("Hello, world!"); try { handler.handleMessage(message); fail("MessagingException expected."); @@ -1042,7 +1042,7 @@ protected Object handleRequestMessage(Message requestMessage) { } @Test - public void enhancedRecoverer() throws Exception { + public void enhancedRecoverer() { QueueChannel channel = new QueueChannel(); ErrorMessageSendingRecoverer recoverer = new ErrorMessageSendingRecoverer(channel); recoverer.publish(new GenericMessage<>("foo"), new GenericMessage<>("bar"), new RuntimeException("baz")); diff --git a/spring-integration-ftp/src/test/java/org/springframework/integration/ftp/config/FtpsInboundChannelAdapterParserTests.java b/spring-integration-ftp/src/test/java/org/springframework/integration/ftp/config/FtpsInboundChannelAdapterParserTests.java index e11ac3f72d6..04ec2eb0282 100644 --- a/spring-integration-ftp/src/test/java/org/springframework/integration/ftp/config/FtpsInboundChannelAdapterParserTests.java +++ b/spring-integration-ftp/src/test/java/org/springframework/integration/ftp/config/FtpsInboundChannelAdapterParserTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2016 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,6 +36,7 @@ * @author Oleg Zhurakousky * @author Gunnar Hillert * @author Gary Russell + * @author Artem Bilan */ @ContextConfiguration @RunWith(SpringJUnit4ClassRunner.class) @@ -49,16 +50,16 @@ public class FtpsInboundChannelAdapterParserTests { private MessageChannel ftpChannel; @Test - public void testFtpsInboundChannelAdapterComplete() throws Exception { + public void testFtpsInboundChannelAdapterComplete() { assertEquals("ftpInbound", ftpInbound.getComponentName()); assertEquals("ftp:inbound-channel-adapter", ftpInbound.getComponentType()); - assertNotNull(TestUtils.getPropertyValue(ftpInbound, "poller")); + assertNotNull(TestUtils.getPropertyValue(ftpInbound, "pollingTask")); assertEquals(this.ftpChannel, TestUtils.getPropertyValue(ftpInbound, "outputChannel")); FtpInboundFileSynchronizingMessageSource inbound = - (FtpInboundFileSynchronizingMessageSource) TestUtils.getPropertyValue(ftpInbound, "source"); + (FtpInboundFileSynchronizingMessageSource) TestUtils.getPropertyValue(ftpInbound, "source"); FtpInboundFileSynchronizer fisync = - (FtpInboundFileSynchronizer) TestUtils.getPropertyValue(inbound, "synchronizer"); + (FtpInboundFileSynchronizer) TestUtils.getPropertyValue(inbound, "synchronizer"); assertNotNull(TestUtils.getPropertyValue(fisync, "filter")); }