Skip to content

Commit ccd3574

Browse files
committed
Fix memory leak in the FluxMessageChannel (#8622)
The `FluxMessageChannel` can subscribe to any volatile `Publisher`. For example, we can call Reactor Kafka `Sender.send()` for input data and pass its result to the `FluxMessageChannel` for on demand subscription. These publishers are subscribed in the `FluxMessageChannel` and their `Disposable` is stored in the internal `Disposable.Composite` which currently only cleared on `destroy()` * Extract `Disposable` from those internal `subscribe()` calls into an `AtomicReference`. * Use this `AtomicReference` in the `doOnTerminate()` to remove from the `Disposable.Composite` and `dispose()` when such a volatile `Publisher` is completed **Cherry-pick to `6.0.x` & `5.5.x`** # Conflicts: # spring-integration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java
1 parent 4ddc525 commit ccd3574

File tree

2 files changed

+53
-6
lines changed

2 files changed

+53
-6
lines changed

spring-integration-core/src/main/java/org/springframework/integration/channel/FluxMessageChannel.java

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.time.Duration;
2020
import java.util.concurrent.TimeUnit;
21+
import java.util.concurrent.atomic.AtomicReference;
2122
import java.util.concurrent.locks.LockSupport;
2223

2324
import org.reactivestreams.Publisher;
@@ -102,18 +103,42 @@ public void subscribe(Subscriber<? super Message<?>> subscriber) {
102103
.share()
103104
.subscribe(subscriber);
104105

105-
this.upstreamSubscriptions.add(
106+
Mono<Boolean> subscribersBarrier =
106107
Mono.fromCallable(() -> this.sink.currentSubscriberCount() > 0)
107108
.filter(Boolean::booleanValue)
108109
.doOnNext(this.subscribedSignal::tryEmitNext)
109110
.repeatWhenEmpty((repeat) ->
110-
this.active ? repeat.delayElements(Duration.ofMillis(100)) : repeat) // NOSONAR
111-
.subscribe());
111+
this.active ? repeat.delayElements(Duration.ofMillis(100)) : repeat); // NOSONAR
112+
113+
addPublisherToSubscribe(Flux.from(subscribersBarrier));
114+
}
115+
116+
private void addPublisherToSubscribe(Flux<?> publisher) {
117+
AtomicReference<Disposable> disposableReference = new AtomicReference<>();
118+
119+
Disposable disposable =
120+
publisher
121+
.doOnTerminate(() -> disposeUpstreamSubscription(disposableReference))
122+
.subscribe();
123+
124+
if (!disposable.isDisposed()) {
125+
if (this.upstreamSubscriptions.add(disposable)) {
126+
disposableReference.set(disposable);
127+
}
128+
}
129+
}
130+
131+
private void disposeUpstreamSubscription(AtomicReference<Disposable> disposableReference) {
132+
Disposable disposable = disposableReference.get();
133+
if (disposable != null) {
134+
this.upstreamSubscriptions.remove(disposable);
135+
disposable.dispose();
136+
}
112137
}
113138

114139
@Override
115140
public void subscribeTo(Publisher<? extends Message<?>> publisher) {
116-
this.upstreamSubscriptions.add(
141+
Flux<?> upstreamPublisher =
117142
Flux.from(publisher)
118143
.delaySubscription(this.subscribedSignal.asFlux().filter(Boolean::booleanValue).next())
119144
.publishOn(this.scheduler)
@@ -127,8 +152,9 @@ public void subscribeTo(Publisher<? extends Message<?>> publisher) {
127152
catch (Exception ex) {
128153
logger.warn(ex, () -> "Error during processing event: " + message);
129154
}
130-
})
131-
.subscribe());
155+
});
156+
157+
addPublisherToSubscribe(upstreamPublisher);
132158
}
133159

134160
@Override

spring-integration-core/src/test/java/org/springframework/integration/channel/reactive/FluxMessageChannelTests.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import org.junit.jupiter.api.Test;
2727
import reactor.core.Disposable;
2828
import reactor.core.publisher.Flux;
29+
import reactor.core.publisher.Mono;
30+
import reactor.test.StepVerifier;
2931

3032
import org.springframework.beans.factory.annotation.Autowired;
3133
import org.springframework.context.annotation.Bean;
@@ -142,6 +144,25 @@ void testFluxMessageChannelCleanUp() throws InterruptedException {
142144
assertThat(TestUtils.getPropertyValue(flux, "sink.sink.done", Boolean.class)).isTrue();
143145
}
144146

147+
@Test
148+
void noMemoryLeakInFluxMessageChannelForVolatilePublishers() {
149+
FluxMessageChannel messageChannel = new FluxMessageChannel();
150+
151+
StepVerifier stepVerifier = StepVerifier.create(messageChannel)
152+
.expectNextCount(3)
153+
.thenCancel()
154+
.verifyLater();
155+
156+
messageChannel.subscribeTo(Mono.just(new GenericMessage<>("test")));
157+
messageChannel.subscribeTo(Flux.just("test1", "test2").map(GenericMessage::new));
158+
159+
stepVerifier.verify();
160+
161+
Disposable.Composite upstreamSubscriptions =
162+
TestUtils.getPropertyValue(messageChannel, "upstreamSubscriptions", Disposable.Composite.class);
163+
assertThat(upstreamSubscriptions.size()).isEqualTo(0);
164+
}
165+
145166
@Configuration
146167
@EnableIntegration
147168
public static class TestConfiguration {

0 commit comments

Comments
 (0)