Skip to content

Commit 768e26f

Browse files
artembilanspring-builds
authored andcommitted
Revise subscribed state in the FluxMessageChannel
The `Sinks.Many<Boolean> subscribedSignal` is a bit of overhead in the logic. * Use `Mono.fromCallable(this.sink::currentSubscriberCount)` instead for `delaySubscription()` when we perform `subscribeTo(Publisher)` (cherry picked from commit bdcb856)
1 parent 841a9f5 commit 768e26f

File tree

1 file changed

+5
-15
lines changed

1 file changed

+5
-15
lines changed

spring-integration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,6 @@ public class FluxMessageChannel extends AbstractMessageChannel
5454

5555
private final Sinks.Many<Message<?>> sink = Sinks.many().multicast().onBackpressureBuffer(1, false);
5656

57-
private final Sinks.Many<Boolean> subscribedSignal = Sinks.many().replay().limit(1);
58-
5957
private final Disposable.Composite upstreamSubscriptions = Disposables.composite();
6058

6159
private volatile boolean active = true;
@@ -102,19 +100,9 @@ private boolean tryEmitMessage(Message<?> message) {
102100
@Override
103101
public void subscribe(Subscriber<? super Message<?>> subscriber) {
104102
this.sink.asFlux()
105-
.doFinally((s) -> this.subscribedSignal.tryEmitNext(this.sink.currentSubscriberCount() > 0))
106103
.publish(1)
107104
.refCount()
108105
.subscribe(subscriber);
109-
110-
Mono<Boolean> subscribersBarrier =
111-
Mono.fromCallable(() -> this.sink.currentSubscriberCount() > 0)
112-
.filter(Boolean::booleanValue)
113-
.doOnNext(this.subscribedSignal::tryEmitNext)
114-
.repeatWhenEmpty((repeat) ->
115-
this.active ? repeat.delayElements(Duration.ofMillis(100)) : repeat); // NOSONAR
116-
117-
addPublisherToSubscribe(Flux.from(subscribersBarrier));
118106
}
119107

120108
private void addPublisherToSubscribe(Flux<?> publisher) {
@@ -144,8 +132,11 @@ private void disposeUpstreamSubscription(AtomicReference<Disposable> disposableR
144132
public void subscribeTo(Publisher<? extends Message<?>> publisher) {
145133
Flux<Object> upstreamPublisher =
146134
Flux.from(publisher)
147-
.delaySubscription(this.subscribedSignal.asFlux().filter(Boolean::booleanValue).next())
148-
// .publishOn(this.scheduler)
135+
.delaySubscription(
136+
Mono.fromCallable(this.sink::currentSubscriberCount)
137+
.filter((value) -> value > 0)
138+
.repeatWhenEmpty((repeat) ->
139+
this.active ? repeat.delayElements(Duration.ofMillis(100)) : repeat))
149140
.flatMap((message) ->
150141
Mono.just(message)
151142
.handle((messageToHandle, syncSink) -> sendReactiveMessage(messageToHandle))
@@ -180,7 +171,6 @@ private void sendReactiveMessage(Message<?> message) {
180171
public void destroy() {
181172
this.active = false;
182173
this.upstreamSubscriptions.dispose();
183-
this.subscribedSignal.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
184174
this.sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
185175
super.destroy();
186176
}

0 commit comments

Comments
 (0)