Skip to content

Commit 2b576c5

Browse files
committed
GH-8950: Fix PublisherIntegrationFlow for AbstractEndpoint
Fixes: #8950 The `IntegrationFlow.toReactivePublisher(true)` makes `PublisherIntegrationFlow` to be stopped from the beginning and waiting for the `Publisher.subscribe()`. However only `EndpointSpec` components are marked as `autoStartup(false)`. The `MessageProducerSupport` is not included, therefore unexpected messages are produced to the channel in the end of flow without subscribers. * Check for `AbstractEndpoint` component type in the `PublisherIntegrationFlow` and mark it as `setAutoStartup(false)` * Ensure in a new `ReactiveStreamsTests.messageProducerIsNotStartedAutomatically()` test that `MessageProducerSupport` is not started from the beginning, but rather when we subscribe to the `Publisher` via `StepVerifier` **Auto-cherry-pick to `6.2.x` & `6.1.x`**
1 parent 3751206 commit 2b576c5

File tree

2 files changed

+53
-4
lines changed

2 files changed

+53
-4
lines changed

spring-integration-core/src/main/java/org/springframework/integration/dsl/PublisherIntegrationFlow.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2022 the original author or authors.
2+
* Copyright 2016-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,6 +22,7 @@
2222
import org.reactivestreams.Subscriber;
2323
import reactor.core.publisher.Flux;
2424

25+
import org.springframework.integration.endpoint.AbstractEndpoint;
2526
import org.springframework.messaging.Message;
2627

2728
/**
@@ -48,8 +49,11 @@ class PublisherIntegrationFlow<T> extends StandardIntegrationFlow implements Pub
4849
if (autoStartOnSubscribe) {
4950
flux = flux.doOnSubscribe((sub) -> start());
5051
for (Object component : integrationComponents.keySet()) {
51-
if (component instanceof EndpointSpec) {
52-
((EndpointSpec<?, ?, ?>) component).autoStartup(false);
52+
if (component instanceof EndpointSpec<?, ?, ?> endpointSpec) {
53+
endpointSpec.autoStartup(false);
54+
}
55+
else if (component instanceof AbstractEndpoint endpoint) {
56+
endpoint.setAutoStartup(false);
5357
}
5458
}
5559
}

spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2023 the original author or authors.
2+
* Copyright 2016-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -34,6 +34,7 @@
3434
import reactor.core.Disposable;
3535
import reactor.core.publisher.Flux;
3636
import reactor.core.scheduler.Schedulers;
37+
import reactor.test.StepVerifier;
3738

3839
import org.springframework.beans.factory.annotation.Autowired;
3940
import org.springframework.beans.factory.annotation.Qualifier;
@@ -45,8 +46,11 @@
4546
import org.springframework.integration.config.EnableIntegration;
4647
import org.springframework.integration.dsl.IntegrationFlow;
4748
import org.springframework.integration.dsl.MessageChannels;
49+
import org.springframework.integration.dsl.MessageProducerSpec;
4850
import org.springframework.integration.dsl.context.IntegrationFlowContext;
4951
import org.springframework.integration.endpoint.AbstractEndpoint;
52+
import org.springframework.integration.endpoint.MessageProducerSupport;
53+
import org.springframework.integration.endpoint.ReactiveMessageSourceProducer;
5054
import org.springframework.integration.endpoint.ReactiveStreamsConsumer;
5155
import org.springframework.messaging.Message;
5256
import org.springframework.messaging.MessageChannel;
@@ -240,6 +244,27 @@ void fixedSubscriberChannelFlowTest() throws InterruptedException {
240244
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
241245
}
242246

247+
@Autowired
248+
MessageProducerSupport testMessageProducer;
249+
250+
@Autowired
251+
Publisher<Message<String>> messageProducerFlow;
252+
253+
@Test
254+
void messageProducerIsNotStartedAutomatically() {
255+
assertThat(this.testMessageProducer.isRunning()).isFalse();
256+
257+
Flux<String> flux =
258+
Flux.from(this.messageProducerFlow)
259+
.map(Message::getPayload);
260+
261+
StepVerifier.create(flux)
262+
.expectNext("test")
263+
.expectNext("test")
264+
.thenCancel()
265+
.verify(Duration.ofSeconds(10));
266+
}
267+
243268
@Configuration
244269
@EnableIntegration
245270
public static class ContextConfiguration {
@@ -287,6 +312,26 @@ public Publisher<Message<String>> fixedSubscriberChannelFlow() {
287312
.toReactivePublisher();
288313
}
289314

315+
@Bean
316+
public Publisher<Message<String>> messageProducerFlow() {
317+
TestMessageProducerSpec testMessageProducerSpec =
318+
new TestMessageProducerSpec(new ReactiveMessageSourceProducer(() -> new GenericMessage<>("test")))
319+
.id("testMessageProducer");
320+
321+
return IntegrationFlow
322+
.from(testMessageProducerSpec)
323+
.toReactivePublisher(true);
324+
}
325+
326+
}
327+
328+
private static class TestMessageProducerSpec
329+
extends MessageProducerSpec<TestMessageProducerSpec, ReactiveMessageSourceProducer> {
330+
331+
TestMessageProducerSpec(ReactiveMessageSourceProducer producer) {
332+
super(producer);
333+
}
334+
290335
}
291336

292337
}

0 commit comments

Comments
 (0)