|
1 | 1 | /*
|
2 |
| - * Copyright 2002-2021 the original author or authors. |
| 2 | + * Copyright 2002-2022 the original author or authors. |
3 | 3 | *
|
4 | 4 | * Licensed under the Apache License, Version 2.0 (the "License");
|
5 | 5 | * you may not use this file except in compliance with the License.
|
|
84 | 84 | import org.springframework.messaging.SubscribableChannel;
|
85 | 85 | import org.springframework.messaging.core.DestinationResolutionException;
|
86 | 86 | import org.springframework.messaging.core.DestinationResolver;
|
87 |
| -import org.springframework.messaging.handler.annotation.ValueConstants; |
88 | 87 | import org.springframework.scheduling.Trigger;
|
89 | 88 | import org.springframework.scheduling.support.CronTrigger;
|
90 | 89 | import org.springframework.scheduling.support.PeriodicTrigger;
|
|
103 | 102 | * @author Mark Fisher
|
104 | 103 | * @author Artem Bilan
|
105 | 104 | * @author Gary Russell
|
| 105 | + * @author Chris Bono |
106 | 106 | */
|
107 | 107 | public abstract class AbstractMethodAnnotationPostProcessor<T extends Annotation>
|
108 | 108 | implements MethodAnnotationPostProcessor<T> {
|
@@ -363,24 +363,24 @@ protected AbstractEndpoint createEndpoint(MessageHandler handler, @SuppressWarni
|
363 | 363 | protected AbstractEndpoint doCreateEndpoint(MessageHandler handler, MessageChannel inputChannel,
|
364 | 364 | List<Annotation> annotations) {
|
365 | 365 |
|
366 |
| - Poller[] pollers = MessagingAnnotationUtils.resolveAttribute(annotations, "poller", Poller[].class); |
| 366 | + Poller poller = MessagingAnnotationUtils.resolveAttribute(annotations, "poller", Poller.class); |
| 367 | + |
367 | 368 | Reactive reactive = MessagingAnnotationUtils.resolveAttribute(annotations, "reactive", Reactive.class);
|
368 |
| - boolean reactiveProvided = reactive != null && !ValueConstants.DEFAULT_NONE.equals(reactive.value()); |
369 | 369 |
|
370 |
| - Assert.state(!reactiveProvided || ObjectUtils.isEmpty(pollers), |
| 370 | + Assert.state(reactive == null || poller == null, |
371 | 371 | "The 'poller' and 'reactive' are mutually exclusive.");
|
372 | 372 |
|
373 |
| - if (inputChannel instanceof Publisher || handler instanceof ReactiveMessageHandlerAdapter || reactiveProvided) { |
374 |
| - return reactiveStreamsConsumer(inputChannel, handler, reactiveProvided ? reactive : null); |
| 373 | + if (inputChannel instanceof Publisher || handler instanceof ReactiveMessageHandlerAdapter || reactive != null) { |
| 374 | + return reactiveStreamsConsumer(inputChannel, handler, reactive); |
375 | 375 | }
|
376 | 376 | else if (inputChannel instanceof SubscribableChannel) {
|
377 |
| - Assert.state(ObjectUtils.isEmpty(pollers), () -> |
| 377 | + Assert.state(poller == null, () -> |
378 | 378 | "A '@Poller' should not be specified for Annotation-based " +
|
379 | 379 | "endpoint, since '" + inputChannel + "' is a SubscribableChannel (not pollable).");
|
380 | 380 | return new EventDrivenConsumer((SubscribableChannel) inputChannel, handler);
|
381 | 381 | }
|
382 | 382 | else if (inputChannel instanceof PollableChannel) {
|
383 |
| - return pollingConsumer(inputChannel, handler, pollers); |
| 383 | + return pollingConsumer(inputChannel, handler, poller); |
384 | 384 | }
|
385 | 385 | else {
|
386 | 386 | throw new IllegalArgumentException("Unsupported 'inputChannel' type: '"
|
@@ -414,19 +414,15 @@ private ReactiveStreamsConsumer reactiveStreamsConsumer(MessageChannel channel,
|
414 | 414 | return reactiveStreamsConsumer;
|
415 | 415 | }
|
416 | 416 |
|
417 |
| - private PollingConsumer pollingConsumer(MessageChannel inputChannel, MessageHandler handler, Poller[] pollers) { |
| 417 | + private PollingConsumer pollingConsumer(MessageChannel inputChannel, MessageHandler handler, Poller poller) { |
418 | 418 | PollingConsumer pollingConsumer = new PollingConsumer((PollableChannel) inputChannel, handler);
|
419 |
| - configurePollingEndpoint(pollingConsumer, pollers); |
| 419 | + configurePollingEndpoint(pollingConsumer, poller); |
420 | 420 | return pollingConsumer;
|
421 | 421 | }
|
422 | 422 |
|
423 |
| - protected void configurePollingEndpoint(AbstractPollingEndpoint pollingEndpoint, Poller[] pollers) { |
| 423 | + protected void configurePollingEndpoint(AbstractPollingEndpoint pollingEndpoint, Poller poller) { |
424 | 424 | PollerMetadata pollerMetadata;
|
425 |
| - if (!ObjectUtils.isEmpty(pollers)) { |
426 |
| - Assert.state(pollers.length == 1, |
427 |
| - "The 'poller' for an Annotation-based endpoint can have only one '@Poller'."); |
428 |
| - Poller poller = pollers[0]; |
429 |
| - |
| 425 | + if (poller != null) { |
430 | 426 | String ref = poller.value();
|
431 | 427 | String triggerRef = poller.trigger();
|
432 | 428 | String executorRef = poller.taskExecutor();
|
|
0 commit comments