Skip to content

Commit 7946870

Browse files
committed
Add Reactive mode for AbstractPollingEndpoint
* When `SourcePollingChannelAdapter.outputChannel` is a `ReactiveStreamsSubscribableChannel`, use `Flux.generate()` for polling * Refactor `AbstractPollingEndpoint` to remove redundant `Poller` class in favor of lambda * Extract `pollForMessage()` method to handle TX states instead of `Poller` class previously
1 parent b5b1009 commit 7946870

File tree

8 files changed

+317
-141
lines changed

8 files changed

+317
-141
lines changed

spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractPollingEndpoint.java

Lines changed: 161 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,17 @@
1616

1717
package org.springframework.integration.endpoint;
1818

19+
import java.time.Duration;
1920
import java.util.Collection;
21+
import java.util.Date;
2022
import java.util.List;
2123
import java.util.concurrent.Callable;
2224
import java.util.concurrent.Executor;
2325
import java.util.concurrent.ScheduledFuture;
2426
import java.util.stream.Collectors;
2527

2628
import org.aopalliance.aop.Advice;
29+
import org.reactivestreams.Subscription;
2730

2831
import org.springframework.aop.framework.ProxyFactory;
2932
import org.springframework.beans.factory.BeanClassLoaderAware;
@@ -43,6 +46,7 @@
4346
import org.springframework.messaging.MessagingException;
4447
import org.springframework.scheduling.Trigger;
4548
import org.springframework.scheduling.support.PeriodicTrigger;
49+
import org.springframework.scheduling.support.SimpleTriggerContext;
4650
import org.springframework.transaction.interceptor.TransactionInterceptor;
4751
import org.springframework.transaction.support.TransactionSynchronization;
4852
import org.springframework.transaction.support.TransactionSynchronizationManager;
@@ -51,6 +55,10 @@
5155
import org.springframework.util.CollectionUtils;
5256
import org.springframework.util.ErrorHandler;
5357

58+
import reactor.core.publisher.Flux;
59+
import reactor.core.publisher.Mono;
60+
import reactor.core.scheduler.Schedulers;
61+
5462
/**
5563
* @author Mark Fisher
5664
* @author Oleg Zhurakousky
@@ -64,25 +72,27 @@ public abstract class AbstractPollingEndpoint extends AbstractEndpoint implement
6472

6573
private Executor taskExecutor = new SyncTaskExecutor();
6674

67-
private boolean syncExecutor = true;
75+
private ClassLoader beanClassLoader = ClassUtils.getDefaultClassLoader();
76+
77+
private Trigger trigger = new PeriodicTrigger(10);
78+
79+
private long maxMessagesPerPoll = -1;
6880

6981
private ErrorHandler errorHandler;
7082

7183
private boolean errorHandlerIsDefault;
7284

73-
private Trigger trigger = new PeriodicTrigger(10);
74-
7585
private List<Advice> adviceChain;
7686

77-
private ClassLoader beanClassLoader = ClassUtils.getDefaultClassLoader();
87+
private TransactionSynchronizationFactory transactionSynchronizationFactory;
7888

79-
private long maxMessagesPerPoll = -1;
89+
private volatile Callable<Message<?>> pollingTask;
8090

81-
private TransactionSynchronizationFactory transactionSynchronizationFactory;
91+
private volatile Flux<Message<?>> pollingFlux;
8292

83-
private volatile ScheduledFuture<?> runningTask;
93+
private volatile Subscription subscription;
8494

85-
private volatile Runnable poller;
95+
private volatile ScheduledFuture<?> runningTask;
8696

8797
private volatile boolean initialized;
8898

@@ -167,6 +177,14 @@ protected boolean isReceiveOnlyAdvice(Advice advice) {
167177
protected void applyReceiveOnlyAdviceChain(Collection<Advice> chain) {
168178
}
169179

180+
protected boolean isReactive() {
181+
return false;
182+
}
183+
184+
protected Flux<Message<?>> getPollingFlux() {
185+
return this.pollingFlux;
186+
}
187+
170188
@Override
171189
protected void onInit() {
172190
synchronized (this.initializationMonitor) {
@@ -200,16 +218,38 @@ protected void onInit() {
200218
}
201219
}
202220

221+
// LifecycleSupport implementation
222+
223+
@Override // guarded by super#lifecycleLock
224+
protected void doStart() {
225+
if (!this.initialized) {
226+
onInit();
227+
}
228+
229+
this.pollingTask = createPollingTask();
230+
231+
if (isReactive()) {
232+
this.pollingFlux = createFluxGenerator();
233+
}
234+
else {
235+
Assert.state(getTaskScheduler() != null, "unable to start polling, no taskScheduler available");
236+
237+
this.runningTask =
238+
getTaskScheduler()
239+
.schedule(createPoller(), this.trigger);
240+
}
241+
}
242+
203243
@SuppressWarnings("unchecked")
204-
private Runnable createPoller() throws Exception {
244+
private Callable<Message<?>> createPollingTask() {
205245
List<Advice> receiveOnlyAdviceChain = null;
206246
if (!CollectionUtils.isEmpty(this.adviceChain)) {
207247
receiveOnlyAdviceChain = this.adviceChain.stream()
208248
.filter(this::isReceiveOnlyAdvice)
209249
.collect(Collectors.toList());
210250
}
211251

212-
Callable<Boolean> pollingTask = this::doPoll;
252+
Callable<Message<?>> pollingTask = this::doPoll;
213253

214254
List<Advice> adviceChain = this.adviceChain;
215255
if (!CollectionUtils.isEmpty(adviceChain)) {
@@ -219,65 +259,122 @@ private Runnable createPoller() throws Exception {
219259
.filter(advice -> !isReceiveOnlyAdvice(advice))
220260
.forEach(proxyFactory::addAdvice);
221261
}
222-
pollingTask = (Callable<Boolean>) proxyFactory.getProxy(this.beanClassLoader);
262+
pollingTask = (Callable<Message<?>>) proxyFactory.getProxy(this.beanClassLoader);
223263
}
224264
if (!CollectionUtils.isEmpty(receiveOnlyAdviceChain)) {
225265
applyReceiveOnlyAdviceChain(receiveOnlyAdviceChain);
226266
}
227-
return new Poller(pollingTask);
267+
268+
return pollingTask;
228269
}
229270

230-
// LifecycleSupport implementation
271+
private Runnable createPoller() {
272+
return () ->
273+
this.taskExecutor.execute(() -> {
274+
int count = 0;
275+
while (this.initialized && (this.maxMessagesPerPoll <= 0 || count < this.maxMessagesPerPoll)) {
276+
if (pollForMessage() == null) {
277+
break;
278+
}
279+
count++;
280+
}
281+
});
282+
}
231283

232-
@Override // guarded by super#lifecycleLock
233-
protected void doStart() {
234-
if (!this.initialized) {
235-
this.onInit();
236-
}
237-
Assert.state(this.getTaskScheduler() != null,
238-
"unable to start polling, no taskScheduler available");
284+
private Flux<Message<?>> createFluxGenerator() {
285+
SimpleTriggerContext triggerContext = new SimpleTriggerContext();
286+
287+
return Flux
288+
.<Duration>generate(sink -> {
289+
Date date = this.trigger.nextExecutionTime(triggerContext);
290+
if (date != null) {
291+
triggerContext.update(date, null, null);
292+
long millis = date.getTime() - System.currentTimeMillis();
293+
sink.next(Duration.ofMillis(millis));
294+
}
295+
else {
296+
sink.complete();
297+
}
298+
})
299+
.concatMap(duration ->
300+
Mono.delay(duration)
301+
.doOnNext(l ->
302+
triggerContext.update(triggerContext.lastScheduledExecutionTime(),
303+
new Date(), null))
304+
.flatMapMany(l ->
305+
Flux
306+
.<Message<?>>generate(fluxSink -> {
307+
Message<?> message = pollForMessage();
308+
if (message != null) {
309+
fluxSink.next(message);
310+
}
311+
else {
312+
fluxSink.complete();
313+
}
314+
})
315+
.take(this.maxMessagesPerPoll)
316+
.subscribeOn(Schedulers.fromExecutor(this.taskExecutor))
317+
.doOnComplete(() ->
318+
triggerContext.update(triggerContext.lastScheduledExecutionTime(),
319+
triggerContext.lastActualExecutionTime(),
320+
new Date())
321+
)), 1)
322+
.repeat(this::isRunning)
323+
.doOnSubscribe(subscription -> this.subscription = subscription);
324+
}
325+
326+
private Message<?> pollForMessage() {
239327
try {
240-
this.poller = createPoller();
328+
return this.pollingTask.call();
241329
}
242330
catch (Exception e) {
243-
this.initialized = false;
244-
throw new MessagingException("Failed to create Poller", e);
331+
if (e instanceof MessagingException) {
332+
throw (MessagingException) e;
333+
}
334+
else {
335+
Message<?> failedMessage = null;
336+
if (this.transactionSynchronizationFactory != null) {
337+
Object resource = TransactionSynchronizationManager.getResource(getResourceToBind());
338+
if (resource instanceof IntegrationResourceHolder) {
339+
failedMessage = ((IntegrationResourceHolder) resource).getMessage();
340+
}
341+
}
342+
throw new MessagingException(failedMessage, e);
343+
}
245344
}
246-
this.runningTask = this.getTaskScheduler().schedule(this.poller, this.trigger);
247-
}
248-
249-
@Override // guarded by super#lifecycleLock
250-
protected void doStop() {
251-
if (this.runningTask != null) {
252-
this.runningTask.cancel(true);
345+
finally {
346+
if (this.transactionSynchronizationFactory != null) {
347+
Object resource = getResourceToBind();
348+
if (TransactionSynchronizationManager.hasResource(resource)) {
349+
TransactionSynchronizationManager.unbindResource(resource);
350+
}
351+
}
253352
}
254-
this.runningTask = null;
255353
}
256354

257-
private boolean doPoll() {
258-
IntegrationResourceHolder holder = this.bindResourceHolderIfNecessary(
259-
this.getResourceKey(), this.getResourceToBind());
260-
Message<?> message = null;
355+
private Message<?> doPoll() {
356+
IntegrationResourceHolder holder = bindResourceHolderIfNecessary(getResourceKey(), getResourceToBind());
357+
Message<?> message;
261358
try {
262-
message = this.receiveMessage();
359+
message = receiveMessage();
263360
}
264361
catch (Exception e) {
265362
if (Thread.interrupted()) {
266363
if (logger.isDebugEnabled()) {
267364
logger.debug("Poll interrupted - during stop()? : " + e.getMessage());
268365
}
269-
return false;
366+
return null;
270367
}
271368
else {
272369
throw (RuntimeException) e;
273370
}
274371
}
275-
boolean result;
372+
276373
if (message == null) {
277374
if (this.logger.isDebugEnabled()) {
278375
this.logger.debug("Received no Message during the poll, returning 'false'");
279376
}
280-
result = false;
377+
return null;
281378
}
282379
else {
283380
if (this.logger.isDebugEnabled()) {
@@ -286,20 +383,35 @@ private boolean doPoll() {
286383
if (holder != null) {
287384
holder.setMessage(message);
288385
}
289-
try {
290-
this.handleMessage(message);
291-
}
292-
catch (Exception e) {
293-
if (e instanceof MessagingException) {
294-
throw new MessagingExceptionWrapper(message, (MessagingException) e);
386+
387+
if (!isReactive()) {
388+
try {
389+
handleMessage(message);
295390
}
296-
else {
297-
throw new MessagingException(message, e);
391+
catch (Exception e) {
392+
if (e instanceof MessagingException) {
393+
throw new MessagingExceptionWrapper(message, (MessagingException) e);
394+
}
395+
else {
396+
throw new MessagingException(message, e);
397+
}
298398
}
299399
}
300-
result = true;
301400
}
302-
return result;
401+
402+
return message;
403+
}
404+
405+
@Override // guarded by super#lifecycleLock
406+
protected void doStop() {
407+
if (this.runningTask != null) {
408+
this.runningTask.cancel(true);
409+
}
410+
this.runningTask = null;
411+
412+
if (this.subscription != null) {
413+
this.subscription.cancel();
414+
}
303415
}
304416

305417
/**
@@ -369,57 +481,4 @@ private IntegrationResourceHolder bindResourceHolderIfNecessary(String key, Obje
369481
return null;
370482
}
371483

372-
/**
373-
* Default Poller implementation
374-
*/
375-
private final class Poller implements Runnable {
376-
377-
private final Callable<Boolean> pollingTask;
378-
379-
Poller(Callable<Boolean> pollingTask) {
380-
this.pollingTask = pollingTask;
381-
}
382-
383-
@Override
384-
public void run() {
385-
AbstractPollingEndpoint.this.taskExecutor.execute(() -> {
386-
int count = 0;
387-
while (AbstractPollingEndpoint.this.initialized
388-
&& (AbstractPollingEndpoint.this.maxMessagesPerPoll <= 0
389-
|| count < AbstractPollingEndpoint.this.maxMessagesPerPoll)) {
390-
try {
391-
if (!Poller.this.pollingTask.call()) {
392-
break;
393-
}
394-
count++;
395-
}
396-
catch (Exception e) {
397-
if (e instanceof MessagingException) {
398-
throw (MessagingException) e;
399-
}
400-
else {
401-
Message<?> failedMessage = null;
402-
if (AbstractPollingEndpoint.this.transactionSynchronizationFactory != null) {
403-
Object resource = TransactionSynchronizationManager.getResource(getResourceToBind());
404-
if (resource instanceof IntegrationResourceHolder) {
405-
failedMessage = ((IntegrationResourceHolder) resource).getMessage();
406-
}
407-
}
408-
throw new MessagingException(failedMessage, e);
409-
}
410-
}
411-
finally {
412-
if (AbstractPollingEndpoint.this.transactionSynchronizationFactory != null) {
413-
Object resource = getResourceToBind();
414-
if (TransactionSynchronizationManager.hasResource(resource)) {
415-
TransactionSynchronizationManager.unbindResource(resource);
416-
}
417-
}
418-
}
419-
}
420-
});
421-
}
422-
423-
}
424-
425484
}

0 commit comments

Comments
 (0)