Skip to content

Commit bf6c4f6

Browse files
artembilanspring-builds
authored andcommitted
GH-9215: Honor back-pressure in FluxMessageChannel
Fixes: #9215 * Instead of `share()` use `publish(1).refCount()` to prefetch only item from upstream. * Also remove `publishOn(this.scheduler)` for upstream publishers in favor of opt-in on the consumer side. (cherry picked from commit e9561b4)
1 parent d22737c commit bf6c4f6

File tree

1 file changed

+3
-7
lines changed

1 file changed

+3
-7
lines changed

spring-integration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@
2828
import reactor.core.publisher.Flux;
2929
import reactor.core.publisher.Mono;
3030
import reactor.core.publisher.Sinks;
31-
import reactor.core.scheduler.Scheduler;
32-
import reactor.core.scheduler.Schedulers;
3331
import reactor.util.context.ContextView;
3432

3533
import org.springframework.core.log.LogMessage;
@@ -54,8 +52,6 @@
5452
public class FluxMessageChannel extends AbstractMessageChannel
5553
implements Publisher<Message<?>>, ReactiveStreamsSubscribableChannel {
5654

57-
private final Scheduler scheduler = Schedulers.boundedElastic();
58-
5955
private final Sinks.Many<Message<?>> sink = Sinks.many().multicast().onBackpressureBuffer(1, false);
6056

6157
private final Sinks.Many<Boolean> subscribedSignal = Sinks.many().replay().limit(1);
@@ -107,7 +103,8 @@ private boolean tryEmitMessage(Message<?> message) {
107103
public void subscribe(Subscriber<? super Message<?>> subscriber) {
108104
this.sink.asFlux()
109105
.doFinally((s) -> this.subscribedSignal.tryEmitNext(this.sink.currentSubscriberCount() > 0))
110-
.share()
106+
.publish(1)
107+
.refCount()
111108
.subscribe(subscriber);
112109

113110
Mono<Boolean> subscribersBarrier =
@@ -148,7 +145,7 @@ public void subscribeTo(Publisher<? extends Message<?>> publisher) {
148145
Flux<Object> upstreamPublisher =
149146
Flux.from(publisher)
150147
.delaySubscription(this.subscribedSignal.asFlux().filter(Boolean::booleanValue).next())
151-
.publishOn(this.scheduler)
148+
// .publishOn(this.scheduler)
152149
.flatMap((message) ->
153150
Mono.just(message)
154151
.handle((messageToHandle, syncSink) -> sendReactiveMessage(messageToHandle))
@@ -185,7 +182,6 @@ public void destroy() {
185182
this.upstreamSubscriptions.dispose();
186183
this.subscribedSignal.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
187184
this.sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
188-
this.scheduler.dispose();
189185
super.destroy();
190186
}
191187

0 commit comments

Comments
 (0)