Skip to content

Commit c8477ad

Browse files
committed
GH-8950: Fix PublisherIntegrationFlow for AbstractEndpoint
Fixes: #8950 The `IntegrationFlow.toReactivePublisher(true)` makes `PublisherIntegrationFlow` to be stopped from the beginning and waiting for the `Publisher.subscribe()`. However only `EndpointSpec` components are marked as `autoStartup(false)`. The `MessageProducerSupport` is not included, therefore unexpected messages are produced to the channel in the end of flow without subscribers. * Check for `AbstractEndpoint` component type in the `PublisherIntegrationFlow` and mark it as `setAutoStartup(false)` * Ensure in a new `ReactiveStreamsTests.messageProducerIsNotStartedAutomatically()` test that `MessageProducerSupport` is not started from the beginning, but rather when we subscribe to the `Publisher` via `StepVerifier` (cherry picked from commit 2b576c5) # Conflicts: # spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java
1 parent 6a2f688 commit c8477ad

File tree

2 files changed

+53
-4
lines changed

2 files changed

+53
-4
lines changed

spring-integration-core/src/main/java/org/springframework/integration/dsl/PublisherIntegrationFlow.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2022 the original author or authors.
2+
* Copyright 2016-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,6 +22,7 @@
2222
import org.reactivestreams.Subscriber;
2323
import reactor.core.publisher.Flux;
2424

25+
import org.springframework.integration.endpoint.AbstractEndpoint;
2526
import org.springframework.messaging.Message;
2627

2728
/**
@@ -48,8 +49,11 @@ class PublisherIntegrationFlow<T> extends StandardIntegrationFlow implements Pub
4849
if (autoStartOnSubscribe) {
4950
flux = flux.doOnSubscribe((sub) -> start());
5051
for (Object component : integrationComponents.keySet()) {
51-
if (component instanceof EndpointSpec) {
52-
((EndpointSpec<?, ?, ?>) component).autoStartup(false);
52+
if (component instanceof EndpointSpec<?, ?, ?> endpointSpec) {
53+
endpointSpec.autoStartup(false);
54+
}
55+
else if (component instanceof AbstractEndpoint endpoint) {
56+
endpoint.setAutoStartup(false);
5357
}
5458
}
5559
}

spring-integration-core/src/test/java/org/springframework/integration/dsl/reactivestreams/ReactiveStreamsTests.java

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2022 the original author or authors.
2+
* Copyright 2016-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -34,6 +34,7 @@
3434
import reactor.core.Disposable;
3535
import reactor.core.publisher.Flux;
3636
import reactor.core.scheduler.Schedulers;
37+
import reactor.test.StepVerifier;
3738

3839
import org.springframework.beans.factory.annotation.Autowired;
3940
import org.springframework.beans.factory.annotation.Qualifier;
@@ -45,8 +46,11 @@
4546
import org.springframework.integration.config.EnableIntegration;
4647
import org.springframework.integration.dsl.IntegrationFlow;
4748
import org.springframework.integration.dsl.MessageChannels;
49+
import org.springframework.integration.dsl.MessageProducerSpec;
4850
import org.springframework.integration.dsl.context.IntegrationFlowContext;
4951
import org.springframework.integration.endpoint.AbstractEndpoint;
52+
import org.springframework.integration.endpoint.MessageProducerSupport;
53+
import org.springframework.integration.endpoint.ReactiveMessageSourceProducer;
5054
import org.springframework.integration.endpoint.ReactiveStreamsConsumer;
5155
import org.springframework.messaging.Message;
5256
import org.springframework.messaging.MessageChannel;
@@ -240,6 +244,27 @@ void fixedSubscriberChannelFlowTest() throws InterruptedException {
240244
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
241245
}
242246

247+
@Autowired
248+
MessageProducerSupport testMessageProducer;
249+
250+
@Autowired
251+
Publisher<Message<String>> messageProducerFlow;
252+
253+
@Test
254+
void messageProducerIsNotStartedAutomatically() {
255+
assertThat(this.testMessageProducer.isRunning()).isFalse();
256+
257+
Flux<String> flux =
258+
Flux.from(this.messageProducerFlow)
259+
.map(Message::getPayload);
260+
261+
StepVerifier.create(flux)
262+
.expectNext("test")
263+
.expectNext("test")
264+
.thenCancel()
265+
.verify(Duration.ofSeconds(10));
266+
}
267+
243268
@Configuration
244269
@EnableIntegration
245270
public static class ContextConfiguration {
@@ -285,6 +310,26 @@ public Publisher<Message<String>> fixedSubscriberChannelFlow() {
285310
.toReactivePublisher();
286311
}
287312

313+
@Bean
314+
public Publisher<Message<String>> messageProducerFlow() {
315+
TestMessageProducerSpec testMessageProducerSpec =
316+
new TestMessageProducerSpec(new ReactiveMessageSourceProducer(() -> new GenericMessage<>("test")))
317+
.id("testMessageProducer");
318+
319+
return IntegrationFlow
320+
.from(testMessageProducerSpec)
321+
.toReactivePublisher(true);
322+
}
323+
324+
}
325+
326+
private static class TestMessageProducerSpec
327+
extends MessageProducerSpec<TestMessageProducerSpec, ReactiveMessageSourceProducer> {
328+
329+
TestMessageProducerSpec(ReactiveMessageSourceProducer producer) {
330+
super(producer);
331+
}
332+
288333
}
289334

290335
}

0 commit comments

Comments
 (0)